Skip to content

Commit

Permalink
Adds in the ability to run individual jobs on demand (#19)
Browse files Browse the repository at this point in the history
* Adds in the ability to run individual jobs on demand

* Supes up the job running process to properly manage successful attaches.

* Fixed up hope.yaml job definition to not reference local stuff.
  • Loading branch information
Eagerod authored Sep 26, 2020
1 parent 017e941 commit 4ec74cb
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 69 deletions.
58 changes: 2 additions & 56 deletions cmd/hope/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package cmd
import (
"errors"
"fmt"
"math"
"strings"
"time"
)

import (
Expand All @@ -18,12 +16,10 @@ import (
"github.com/Eagerod/hope/pkg/docker"
"github.com/Eagerod/hope/pkg/envsubst"
"github.com/Eagerod/hope/pkg/hope"
"github.com/Eagerod/hope/pkg/kubeutil"
)

const MaximumJobDeploymentPollSeconds int = 60

// rootCmd represents the base command when called without any subcommands
var deployCmd = &cobra.Command{
Use: "deploy",
Short: "Deploy a Kubernetes yaml file",
Expand Down Expand Up @@ -140,58 +136,8 @@ var deployCmd = &cobra.Command{
return err
}
case ResourceTypeJob:
// Exponential backoff maxing out at 60 seconds.
// TODO: Implement maximum retries, or other throughput-related
// controls
// TODO: Fetch more detailed job status information to show on
// the console.
attempts := 1
jobLogger := log.WithFields(log.Fields{})
for ok := false; !ok; {
status, err := hope.GetJobStatus(jobLogger, kubectl, resource.Job)
if err != nil {
return err
}

switch status {
case hope.JobStatusFailed:
return errors.New(fmt.Sprintf("Job %s failed.", resource.Job))
case hope.JobStatusComplete:
log.Debug("Job ", resource.Job, " successful.")
ok = true
break
default:
// If the job is running, start polling for logs.
// Jobs that failed or completed long in the past may
// have had their pods wiped since they ran.
if err := hope.FollowLogsIfContainersRunning(kubectl, resource.Job); err != nil {
log.Warn(err)
attemptsDuration := math.Pow(2, float64(attempts-1))
sleepSeconds := int(math.Min(attemptsDuration, float64(MaximumJobDeploymentPollSeconds)))

if sleepSeconds == MaximumJobDeploymentPollSeconds {
log.Debug("Checking pod events for details...")
// Check the event log for the pods associated
// with this job.
// There may be something useful in there.
pods, err := hope.GetPodsForJob(kubectl, resource.Job)
if err != nil {
log.Warn(err)
continue
}

for _, pods := range *pods {
involvedObject := fmt.Sprintf("involvedObject.name=%s", pods)
kubeutil.ExecKubectl(kubectl, "get", "events", "--field-selector", involvedObject)
}
}

log.Warn("Failed to attach to logs for job ", resource.Job, ". Waiting ", sleepSeconds, " seconds and trying again.")

time.Sleep(time.Second * time.Duration(sleepSeconds))
attempts = attempts + 1
}
}
if err := hope.FollowLogsAndPollUntilJobComplete(log.WithFields(log.Fields{}), kubectl, resource.Job, 10, 60); err != nil {
return err
}
default:
return errors.New(fmt.Sprintf("Resource type (%s) not implemented.", resourceType))
Expand Down
17 changes: 15 additions & 2 deletions cmd/hope/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ func Execute() {
rootCmd.AddCommand(hostnameCmd)
rootCmd.AddCommand(kubeconfigCmd)
rootCmd.AddCommand(resetCmd)
rootCmd.AddCommand(runCmd)
rootCmd.AddCommand(sshCmd)
rootCmd.AddCommand(tokenCmd)

initHostnameCmdFlags()
initKubeconfigCmdFlags()
initResetCmd()
initRunCmdFlags()
initSshCmd()
initTokenCmd()

Expand Down Expand Up @@ -129,7 +131,7 @@ func initLogger() {
log.Error(configParseError)
}

log.Debug("Using config file:", viper.ConfigFileUsed())
log.Debug("Using config file: ", viper.ConfigFileUsed())
}

func patchInvocations() {
Expand All @@ -147,10 +149,21 @@ func patchInvocations() {

oldEnvSubst := envsubst.GetEnvsubst
envsubst.GetEnvsubst = func(str string) (string, error) {
log.Debug("echo **(", len(str), " chars)** | envsubst ")
log.Debug("echo **(", len(str), " chars)** | envsubst")
return oldEnvSubst(str)
}

oldEnvSubstArgs := envsubst.GetEnvsubstArgs
envsubst.GetEnvsubstArgs = func(args map[string]string, str string) (string, error) {
argsKeys := []string{}
for key, _ := range args {
argsKeys = append(argsKeys, fmt.Sprintf("$%s", key))
}

log.Debug("echo **(", len(str), " chars)** | envsubst ", strings.Join(argsKeys, ","))
return oldEnvSubstArgs(args, str)
}

oldExecKubectl := kubeutil.ExecKubectl
kubeutil.ExecKubectl = func(kubectl *kubeutil.Kubectl, args ...string) error {
log.Debug("kubectl ", strings.Join(args, " "))
Expand Down
110 changes: 110 additions & 0 deletions cmd/hope/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package cmd

import (
"io/ioutil"
"errors"
"fmt"
"regexp"
"strings"
)

import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

import (
"github.com/Eagerod/hope/pkg/envsubst"
"github.com/Eagerod/hope/pkg/hope"
)

var runCmdParameterSlice *[]string

func initRunCmdFlags() {
runCmdParameterSlice = runCmd.Flags().StringArrayP("param", "p", []string{}, "parameters to populate in the job yaml")
}

var runCmd = &cobra.Command{
Use: "run",
Short: "Execute and follow a Kubernetes job",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
jobName := args[0]

job, err := getJob(jobName)
if err != nil {
return err
}

remainingParams := map[string]bool{}
for _, param := range job.Parameters {
remainingParams[param] = true
}

params := map[string]string{}
for _, param := range *runCmdParameterSlice {
components := strings.Split(param, "=")
if len(components) != 2 {
return errors.New(fmt.Sprintf("Failed to parse argument: %s", param))
}

paramName := components[0]
paramValue := components[1]

params[paramName] = paramValue
if _, ok := remainingParams[paramName]; !ok {
return errors.New(fmt.Sprintf("Parameter: %s not recognized", paramName))
}

remainingParams[paramName] = false
}

for param, missed := range remainingParams {
if missed {
return errors.New(fmt.Sprintf("Failed to find parameter: %s", param))
}
}

// Pull kubeconfig from remote as late as possible to avoid extra
// network time before validation is done.
masters := viper.GetStringSlice("masters")
kubectl, err := getKubectlFromAnyMaster(log.WithFields(log.Fields{}), masters)
if err != nil {
return err
}

defer kubectl.Destroy()

// TODO: Move to pkg
jobContents, err := ioutil.ReadFile(job.File)
if err != nil {
return err
}

jobText, err := envsubst.GetEnvsubstArgs(params, string(jobContents))
if err != nil {
return err
}

output, err := hope.KubectlGetCreateStdIn(kubectl, jobText)
if err != nil {
return err
}

// Grab the job name from the output
re, err := regexp.Compile("job\\.batch/([^\\s]+)")
if err != nil {
return err
}

kubeJobNameMatches := re.FindStringSubmatch(output)
if len(kubeJobNameMatches) != 2 {
return errors.New(fmt.Sprintf("Failed to parse job name from output: %s", output))
}

kubeJobName := kubeJobNameMatches[1]

return hope.FollowLogsAndPollUntilJobComplete(log.WithFields(log.Fields{}), kubectl, kubeJobName, 10, 12)
},
}
23 changes: 23 additions & 0 deletions cmd/hope/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type Resource struct {
Job string
}

// TODO: Allow jobs to define max retry parameters, or accept them on the
// command line.
type Job struct {
Name string
File string
Parameters []string
}

func (resource *Resource) GetType() (string, error) {
detectedTypes := []string{}
if len(resource.File) != 0 {
Expand Down Expand Up @@ -81,3 +89,18 @@ func getResources() (*[]Resource, error) {
err := viper.UnmarshalKey("resources", &resources)
return &resources, err
}

func getJob(jobName string) (*Job, error) {
var jobs []Job
err := viper.UnmarshalKey("jobs", &jobs)
if err != nil {
return nil, err
}

for _, job := range jobs {
if job.Name == jobName {
return &job, nil
}
}
return nil, errors.New(fmt.Sprintf("Failed to find a job named %s", jobName))
}
18 changes: 18 additions & 0 deletions hope.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,21 @@ resources:
# set to be deleted by any mechanism.
- name: wait-for-some-kind-of-job
job: init-the-database
# Jobs contains a collection of specifications of templated jobs that can be
# run on demand in the cluster.
# These jobs shouldn't be associated to the deployment of any particular
# service, or be a part of the main execution of a service.
# These should be used for more operational repairs/manipulations.
# Jobs should use generateName to ensure that unique instances are created when
# called upon, rather than failing to create because of duplicate names.
# Parameters for these jobs should be provided using the -p X=Y flag; these
# parameters will be populated in the source file using envsubst.
# Arguments not provided in the args list will not be populated in the given
# file, as those may be arguments intended to be populated through the job's
# spec.
# Jobs will be started, and logs will be streamed to the client.
jobs:
- name: some-unscheduled-job
file: /path/to/some/job.yaml
parameters:
- SOMETHING_TO_POPULATE
22 changes: 22 additions & 0 deletions pkg/envsubst/envsubst.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package envsubst

import (
"fmt"
"os"
"os/exec"
"strings"
)

type GetEnvsubstStringFunc func(str string) (string, error)
type GetEnvsubstStringArgsFunc func(args map[string]string, str string) (string, error)

var GetEnvsubst GetEnvsubstStringFunc = func(str string) (string, error) {
osCmd := exec.Command("envsubst")
Expand All @@ -16,3 +18,23 @@ var GetEnvsubst GetEnvsubstStringFunc = func(str string) (string, error) {
outputBytes, err := osCmd.Output()
return string(outputBytes), err
}

var GetEnvsubstArgs GetEnvsubstStringArgsFunc = func(args map[string]string, str string) (string, error) {
argsKeys := []string{}
for key, _ := range args {
argsKeys = append(argsKeys, fmt.Sprintf("$%s", key))
}

osCmd := exec.Command("envsubst", strings.Join(argsKeys, ","))
osCmd.Env = os.Environ()

for key, value := range args {
osCmd.Env = append(osCmd.Env, fmt.Sprintf("%s=%s", key, value))
}

osCmd.Stdin = strings.NewReader(str)
osCmd.Stderr = os.Stderr

outputBytes, err := osCmd.Output()
return string(outputBytes), err
}
8 changes: 8 additions & 0 deletions pkg/hope/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@ func KubectlApplyF(kubectl *kubeutil.Kubectl, path string) error {
func KubectlApplyStdIn(kubectl *kubeutil.Kubectl, stdin string) error {
return kubeutil.InKubectl(kubectl, stdin, "apply", "-f", "-")
}

func KubectlCreateStdIn(kubectl *kubeutil.Kubectl, stdin string) error {
return kubeutil.InKubectl(kubectl, stdin, "create", "-f", "-")
}

func KubectlGetCreateStdIn(kubectl *kubeutil.Kubectl, stdin string) (string, error) {
return kubeutil.GetInKubectl(kubectl, stdin, "create", "-f", "-")
}
Loading

0 comments on commit 4ec74cb

Please sign in to comment.