From 4ec74cb5d88f630b2465facb4bfa8ead596ecc96 Mon Sep 17 00:00:00 2001 From: Aleem Haji Date: Sat, 26 Sep 2020 16:59:19 -0400 Subject: [PATCH] Adds in the ability to run individual jobs on demand (#19) * Adds in the ability to run individual jobs on demand * Supes up the job running process to properly manage successful attaches. * Fixed up hope.yaml job definition to not reference local stuff. --- cmd/hope/deploy.go | 58 +-------------------- cmd/hope/root.go | 17 +++++- cmd/hope/run.go | 110 +++++++++++++++++++++++++++++++++++++++ cmd/hope/utils.go | 23 ++++++++ hope.yaml | 18 +++++++ pkg/envsubst/envsubst.go | 22 ++++++++ pkg/hope/deployment.go | 8 +++ pkg/hope/kubectl_jobs.go | 91 ++++++++++++++++++++++++++++---- pkg/kubeutil/kubeutil.go | 12 ++++- 9 files changed, 290 insertions(+), 69 deletions(-) create mode 100644 cmd/hope/run.go diff --git a/cmd/hope/deploy.go b/cmd/hope/deploy.go index 1be2f8f..6d3ea18 100644 --- a/cmd/hope/deploy.go +++ b/cmd/hope/deploy.go @@ -3,9 +3,7 @@ package cmd import ( "errors" "fmt" - "math" "strings" - "time" ) import ( @@ -18,12 +16,10 @@ import ( "github.com/Eagerod/hope/pkg/docker" "github.com/Eagerod/hope/pkg/envsubst" "github.com/Eagerod/hope/pkg/hope" - "github.com/Eagerod/hope/pkg/kubeutil" ) const MaximumJobDeploymentPollSeconds int = 60 -// rootCmd represents the base command when called without any subcommands var deployCmd = &cobra.Command{ Use: "deploy", Short: "Deploy a Kubernetes yaml file", @@ -140,58 +136,8 @@ var deployCmd = &cobra.Command{ return err } case ResourceTypeJob: - // Exponential backoff maxing out at 60 seconds. - // TODO: Implement maximum retries, or other throughput-related - // controls - // TODO: Fetch more detailed job status information to show on - // the console. - attempts := 1 - jobLogger := log.WithFields(log.Fields{}) - for ok := false; !ok; { - status, err := hope.GetJobStatus(jobLogger, kubectl, resource.Job) - if err != nil { - return err - } - - switch status { - case hope.JobStatusFailed: - return errors.New(fmt.Sprintf("Job %s failed.", resource.Job)) - case hope.JobStatusComplete: - log.Debug("Job ", resource.Job, " successful.") - ok = true - break - default: - // If the job is running, start polling for logs. - // Jobs that failed or completed long in the past may - // have had their pods wiped since they ran. - if err := hope.FollowLogsIfContainersRunning(kubectl, resource.Job); err != nil { - log.Warn(err) - attemptsDuration := math.Pow(2, float64(attempts-1)) - sleepSeconds := int(math.Min(attemptsDuration, float64(MaximumJobDeploymentPollSeconds))) - - if sleepSeconds == MaximumJobDeploymentPollSeconds { - log.Debug("Checking pod events for details...") - // Check the event log for the pods associated - // with this job. - // There may be something useful in there. - pods, err := hope.GetPodsForJob(kubectl, resource.Job) - if err != nil { - log.Warn(err) - continue - } - - for _, pods := range *pods { - involvedObject := fmt.Sprintf("involvedObject.name=%s", pods) - kubeutil.ExecKubectl(kubectl, "get", "events", "--field-selector", involvedObject) - } - } - - log.Warn("Failed to attach to logs for job ", resource.Job, ". Waiting ", sleepSeconds, " seconds and trying again.") - - time.Sleep(time.Second * time.Duration(sleepSeconds)) - attempts = attempts + 1 - } - } + if err := hope.FollowLogsAndPollUntilJobComplete(log.WithFields(log.Fields{}), kubectl, resource.Job, 10, 60); err != nil { + return err } default: return errors.New(fmt.Sprintf("Resource type (%s) not implemented.", resourceType)) diff --git a/cmd/hope/root.go b/cmd/hope/root.go index aa20fea..7f20384 100644 --- a/cmd/hope/root.go +++ b/cmd/hope/root.go @@ -46,12 +46,14 @@ func Execute() { rootCmd.AddCommand(hostnameCmd) rootCmd.AddCommand(kubeconfigCmd) rootCmd.AddCommand(resetCmd) + rootCmd.AddCommand(runCmd) rootCmd.AddCommand(sshCmd) rootCmd.AddCommand(tokenCmd) initHostnameCmdFlags() initKubeconfigCmdFlags() initResetCmd() + initRunCmdFlags() initSshCmd() initTokenCmd() @@ -129,7 +131,7 @@ func initLogger() { log.Error(configParseError) } - log.Debug("Using config file:", viper.ConfigFileUsed()) + log.Debug("Using config file: ", viper.ConfigFileUsed()) } func patchInvocations() { @@ -147,10 +149,21 @@ func patchInvocations() { oldEnvSubst := envsubst.GetEnvsubst envsubst.GetEnvsubst = func(str string) (string, error) { - log.Debug("echo **(", len(str), " chars)** | envsubst ") + log.Debug("echo **(", len(str), " chars)** | envsubst") return oldEnvSubst(str) } + oldEnvSubstArgs := envsubst.GetEnvsubstArgs + envsubst.GetEnvsubstArgs = func(args map[string]string, str string) (string, error) { + argsKeys := []string{} + for key, _ := range args { + argsKeys = append(argsKeys, fmt.Sprintf("$%s", key)) + } + + log.Debug("echo **(", len(str), " chars)** | envsubst ", strings.Join(argsKeys, ",")) + return oldEnvSubstArgs(args, str) + } + oldExecKubectl := kubeutil.ExecKubectl kubeutil.ExecKubectl = func(kubectl *kubeutil.Kubectl, args ...string) error { log.Debug("kubectl ", strings.Join(args, " ")) diff --git a/cmd/hope/run.go b/cmd/hope/run.go new file mode 100644 index 0000000..e5cb7e9 --- /dev/null +++ b/cmd/hope/run.go @@ -0,0 +1,110 @@ +package cmd + +import ( + "io/ioutil" + "errors" + "fmt" + "regexp" + "strings" +) + +import ( + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +import ( + "github.com/Eagerod/hope/pkg/envsubst" + "github.com/Eagerod/hope/pkg/hope" +) + +var runCmdParameterSlice *[]string + +func initRunCmdFlags() { + runCmdParameterSlice = runCmd.Flags().StringArrayP("param", "p", []string{}, "parameters to populate in the job yaml") +} + +var runCmd = &cobra.Command{ + Use: "run", + Short: "Execute and follow a Kubernetes job", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + jobName := args[0] + + job, err := getJob(jobName) + if err != nil { + return err + } + + remainingParams := map[string]bool{} + for _, param := range job.Parameters { + remainingParams[param] = true + } + + params := map[string]string{} + for _, param := range *runCmdParameterSlice { + components := strings.Split(param, "=") + if len(components) != 2 { + return errors.New(fmt.Sprintf("Failed to parse argument: %s", param)) + } + + paramName := components[0] + paramValue := components[1] + + params[paramName] = paramValue + if _, ok := remainingParams[paramName]; !ok { + return errors.New(fmt.Sprintf("Parameter: %s not recognized", paramName)) + } + + remainingParams[paramName] = false + } + + for param, missed := range remainingParams { + if missed { + return errors.New(fmt.Sprintf("Failed to find parameter: %s", param)) + } + } + + // Pull kubeconfig from remote as late as possible to avoid extra + // network time before validation is done. + masters := viper.GetStringSlice("masters") + kubectl, err := getKubectlFromAnyMaster(log.WithFields(log.Fields{}), masters) + if err != nil { + return err + } + + defer kubectl.Destroy() + + // TODO: Move to pkg + jobContents, err := ioutil.ReadFile(job.File) + if err != nil { + return err + } + + jobText, err := envsubst.GetEnvsubstArgs(params, string(jobContents)) + if err != nil { + return err + } + + output, err := hope.KubectlGetCreateStdIn(kubectl, jobText) + if err != nil { + return err + } + + // Grab the job name from the output + re, err := regexp.Compile("job\\.batch/([^\\s]+)") + if err != nil { + return err + } + + kubeJobNameMatches := re.FindStringSubmatch(output) + if len(kubeJobNameMatches) != 2 { + return errors.New(fmt.Sprintf("Failed to parse job name from output: %s", output)) + } + + kubeJobName := kubeJobNameMatches[1] + + return hope.FollowLogsAndPollUntilJobComplete(log.WithFields(log.Fields{}), kubectl, kubeJobName, 10, 12) + }, +} diff --git a/cmd/hope/utils.go b/cmd/hope/utils.go index 60d8ba5..5584c29 100644 --- a/cmd/hope/utils.go +++ b/cmd/hope/utils.go @@ -37,6 +37,14 @@ type Resource struct { Job string } +// TODO: Allow jobs to define max retry parameters, or accept them on the +// command line. +type Job struct { + Name string + File string + Parameters []string +} + func (resource *Resource) GetType() (string, error) { detectedTypes := []string{} if len(resource.File) != 0 { @@ -81,3 +89,18 @@ func getResources() (*[]Resource, error) { err := viper.UnmarshalKey("resources", &resources) return &resources, err } + +func getJob(jobName string) (*Job, error) { + var jobs []Job + err := viper.UnmarshalKey("jobs", &jobs) + if err != nil { + return nil, err + } + + for _, job := range jobs { + if job.Name == jobName { + return &job, nil + } + } + return nil, errors.New(fmt.Sprintf("Failed to find a job named %s", jobName)) +} diff --git a/hope.yaml b/hope.yaml index ff185f5..4e55594 100644 --- a/hope.yaml +++ b/hope.yaml @@ -68,3 +68,21 @@ resources: # set to be deleted by any mechanism. - name: wait-for-some-kind-of-job job: init-the-database +# Jobs contains a collection of specifications of templated jobs that can be +# run on demand in the cluster. +# These jobs shouldn't be associated to the deployment of any particular +# service, or be a part of the main execution of a service. +# These should be used for more operational repairs/manipulations. +# Jobs should use generateName to ensure that unique instances are created when +# called upon, rather than failing to create because of duplicate names. +# Parameters for these jobs should be provided using the -p X=Y flag; these +# parameters will be populated in the source file using envsubst. +# Arguments not provided in the args list will not be populated in the given +# file, as those may be arguments intended to be populated through the job's +# spec. +# Jobs will be started, and logs will be streamed to the client. +jobs: + - name: some-unscheduled-job + file: /path/to/some/job.yaml + parameters: + - SOMETHING_TO_POPULATE diff --git a/pkg/envsubst/envsubst.go b/pkg/envsubst/envsubst.go index 724cfe1..dbedf5f 100644 --- a/pkg/envsubst/envsubst.go +++ b/pkg/envsubst/envsubst.go @@ -1,12 +1,14 @@ package envsubst import ( + "fmt" "os" "os/exec" "strings" ) type GetEnvsubstStringFunc func(str string) (string, error) +type GetEnvsubstStringArgsFunc func(args map[string]string, str string) (string, error) var GetEnvsubst GetEnvsubstStringFunc = func(str string) (string, error) { osCmd := exec.Command("envsubst") @@ -16,3 +18,23 @@ var GetEnvsubst GetEnvsubstStringFunc = func(str string) (string, error) { outputBytes, err := osCmd.Output() return string(outputBytes), err } + +var GetEnvsubstArgs GetEnvsubstStringArgsFunc = func(args map[string]string, str string) (string, error) { + argsKeys := []string{} + for key, _ := range args { + argsKeys = append(argsKeys, fmt.Sprintf("$%s", key)) + } + + osCmd := exec.Command("envsubst", strings.Join(argsKeys, ",")) + osCmd.Env = os.Environ() + + for key, value := range args { + osCmd.Env = append(osCmd.Env, fmt.Sprintf("%s=%s", key, value)) + } + + osCmd.Stdin = strings.NewReader(str) + osCmd.Stderr = os.Stderr + + outputBytes, err := osCmd.Output() + return string(outputBytes), err +} diff --git a/pkg/hope/deployment.go b/pkg/hope/deployment.go index 1fb281d..0ca2c8a 100644 --- a/pkg/hope/deployment.go +++ b/pkg/hope/deployment.go @@ -11,3 +11,11 @@ func KubectlApplyF(kubectl *kubeutil.Kubectl, path string) error { func KubectlApplyStdIn(kubectl *kubeutil.Kubectl, stdin string) error { return kubeutil.InKubectl(kubectl, stdin, "apply", "-f", "-") } + +func KubectlCreateStdIn(kubectl *kubeutil.Kubectl, stdin string) error { + return kubeutil.InKubectl(kubectl, stdin, "create", "-f", "-") +} + +func KubectlGetCreateStdIn(kubectl *kubeutil.Kubectl, stdin string) (string, error) { + return kubeutil.GetInKubectl(kubectl, stdin, "create", "-f", "-") +} diff --git a/pkg/hope/kubectl_jobs.go b/pkg/hope/kubectl_jobs.go index 3951c1e..876d0d2 100644 --- a/pkg/hope/kubectl_jobs.go +++ b/pkg/hope/kubectl_jobs.go @@ -1,8 +1,11 @@ package hope import ( + "errors" "fmt" + "math" "strings" + "time" ) import ( @@ -16,10 +19,10 @@ import ( type JobStatus int const ( - JobStatusUnknown JobStatus = 0 - JobStatusRunning = 1 - JobStatusComplete = 2 - JobStatusFailed = 3 + JobStatusUnknown JobStatus = 0 + JobStatusRunning = 1 + JobStatusComplete = 2 + JobStatusFailed = 3 ) // Check to see if the provided job has completed, or is still running. @@ -35,25 +38,93 @@ func GetJobStatus(log *logrus.Entry, kubectl *kubeutil.Kubectl, job string) (Job case "Failed": return JobStatusFailed, nil default: - return JobStatusRunning, nil + return JobStatusRunning, nil } } func FollowLogsIfContainersRunning(kubectl *kubeutil.Kubectl, job string) error { jobSelector := fmt.Sprintf("job-name=%s", job) - - // Wait for this loop to finish without failure. - // Exponential backoff for re-attempts, max 12 seconds return kubeutil.ExecKubectl(kubectl, "logs", "-f", "-l", jobSelector) } func GetPodsForJob(kubectl *kubeutil.Kubectl, job string) (*[]string, error) { jobSelector := fmt.Sprintf("job-name=%s", job) - output, err := kubeutil.GetKubectl(kubectl, "get", "pods", "-l", jobSelector, "-o", "template={{range .items}}{{.metadata.name}}{{end}}") + output, err := kubeutil.GetKubectl(kubectl, "get", "pods", "-l", jobSelector, "-o", "template={{range .items}}{{.metadata.name}} {{end}}") if err != nil { return nil, err } - pods := strings.Split(output, "\n") + pods := strings.Split(strings.TrimSpace(output), " ") return &pods, nil } + +func FollowLogsAndPollUntilJobComplete(log *logrus.Entry, kubectl *kubeutil.Kubectl, job string, maxAttempts int, failedPollDelayMaxSeconds int) error { + // Check the job status before anything. + // It's possible that the job ran long ago, and pods have been cleaned up. + // If that's the case, attempting to attach to logs will fail; and that + // won't be straight-forward to recover from. + status, err := GetJobStatus(log, kubectl, job) + if err != nil { + return err + } + + switch status { + case JobStatusFailed: + return errors.New(fmt.Sprintf("Job %s failed.", job)) + case JobStatusComplete: + log.Debug("Job ", job, " successful.") + return nil + } + + for attempt := 0; attempt < maxAttempts; attempt++ { + attemptsDuration := math.Pow(2, float64(attempt)) + onFailureSleepSeconds := int(math.Min(attemptsDuration, float64(failedPollDelayMaxSeconds))) + + logsErr := FollowLogsIfContainersRunning(kubectl, job) + if logsErr != nil { + log.Warn(logsErr) + } + + // Logs may have successfully attached and printed for failed + // containers, so just because the log function succeeded, we + // can't assume success. + status, err := GetJobStatus(log, kubectl, job) + if err != nil { + return err + } + + switch status { + case JobStatusFailed: + return errors.New(fmt.Sprintf("Job %s failed.", job)) + case JobStatusComplete: + log.Debug("Job ", job, " successful.") + return nil + } + + if onFailureSleepSeconds == failedPollDelayMaxSeconds { + log.Debug("Checking pod events for details...") + // Check the event log for the pods associated with this job. + // There may be something useful in there. + pods, err := GetPodsForJob(kubectl, job) + if err != nil { + log.Warn(err) + } else { + // TODO: Keep track of which pods have been printed, and if + // there have been no events for a given pod since the last + // time we tried to print them, don't print anything. + for _, pods := range *pods { + involvedObject := fmt.Sprintf("involvedObject.name=%s", pods) + kubeutil.ExecKubectl(kubectl, "get", "events", "--field-selector", involvedObject) + } + } + } + if logsErr != nil { + log.Warn("Failed to fetch logs for job ", job, ". Waiting ", onFailureSleepSeconds, " seconds and trying again.") + } else { + log.Warn("Logs fetched, but job ", job, " is still running. Waiting ", onFailureSleepSeconds, " seconds and trying again.") + } + time.Sleep(time.Second * time.Duration(onFailureSleepSeconds)) + } + + return errors.New(fmt.Sprintf("Job did not finish within %d attempts. The job may still be running.", maxAttempts)) +} diff --git a/pkg/kubeutil/kubeutil.go b/pkg/kubeutil/kubeutil.go index 013a3bd..e89d4d2 100644 --- a/pkg/kubeutil/kubeutil.go +++ b/pkg/kubeutil/kubeutil.go @@ -16,12 +16,12 @@ import ( type GetKubectlFunc func(kubectl *Kubectl, args ...string) (string, error) type ExecKubectlFunc func(kubectl *Kubectl, args ...string) error type InKubectlFunc func(kubectl *Kubectl, stdin string, args ...string) error +type GetInKubectlFunc func(kubectl *Kubectl, stdin string, args ...string) (string, error) var GetKubectl GetKubectlFunc = func(kubectl *Kubectl, args ...string) (string, error) { osCmd := exec.Command("kubectl", args...) osCmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubectl.KubeconfigPath)) osCmd.Stdin = os.Stdin - osCmd.Stdin = os.Stdin osCmd.Stderr = os.Stderr outputBytes, err := osCmd.Output() @@ -48,6 +48,16 @@ var InKubectl InKubectlFunc = func(kubectl *Kubectl, stdin string, args ...strin return osCmd.Run() } +var GetInKubectl GetInKubectlFunc = func(kubectl *Kubectl, stdin string, args ...string) (string, error) { + osCmd := exec.Command("kubectl", args...) + osCmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubectl.KubeconfigPath)) + osCmd.Stdin = strings.NewReader(stdin) + osCmd.Stderr = os.Stderr + + outputBytes, err := osCmd.Output() + return string(outputBytes), err +} + // Get the name by which the cluster recognizes a given host. func NodeNameFromHost(kubectl *Kubectl, host string) (string, error) { nodesOutput, err := GetKubectl(kubectl, "get", "nodes", "-o", "custom-columns=NODE:metadata.name,IP:status.addresses[?(@.type=='InternalIP')].address")