From 165d2a1d7a84a3f088d89b4574fe505234e57517 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Wed, 11 Jul 2018 21:07:52 +0800 Subject: [PATCH 1/3] refactor the process of controlling a training job --- cmd/edl/edl.go | 3 +- pkg/apis/paddlepaddle/v1/types.go | 12 +- pkg/autoscaler/autoscaler.go | 26 +- pkg/autoscaler/resource.go | 2 +- pkg/controller/trainingjob_controller.go | 251 +++++---- pkg/updater/jobparser.go | 2 +- pkg/updater/trainingJobUpdater.go | 589 -------------------- pkg/updater/trainingjob_updater.go | 653 +++++++++++++++++++++++ 8 files changed, 819 insertions(+), 719 deletions(-) delete mode 100644 pkg/updater/trainingJobUpdater.go create mode 100644 pkg/updater/trainingjob_updater.go diff --git a/cmd/edl/edl.go b/cmd/edl/edl.go index 2e68eb73..52a29923 100644 --- a/cmd/edl/edl.go +++ b/cmd/edl/edl.go @@ -34,6 +34,7 @@ var ( func main() { masterURL := flag.String("master", "", "Address of a kube master.") kubeConfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.") + autoClean := flag.Bool("autoclean", false, "Auto clean pods after terminating job, default false") maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around this value, jobs will scale down if total request is over this level.`) flag.Parse() @@ -58,7 +59,7 @@ func main() { run := func(stop <-chan struct{}) { log.Info("I won the leader election", "hostname", hostname) paddleInformer := paddleinformers.NewSharedInformerFactory(paddleClient, time.Second*10) - controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer) + controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer, *autoClean) go paddleInformer.Start(stopCh) if controller.Run(1, *maxLoadDesired, stopCh); err != nil { diff --git a/pkg/apis/paddlepaddle/v1/types.go b/pkg/apis/paddlepaddle/v1/types.go index b6a3aa23..0086860e 100644 --- a/pkg/apis/paddlepaddle/v1/types.go +++ b/pkg/apis/paddlepaddle/v1/types.go @@ -140,12 +140,12 @@ type TrainerJobScaleRecords struct { type TrainingResourceType string const ( - // Master is the master name of TrainingResourceType. - Master TrainingResourceType = "MASTER" - // Pserver is the pserver name of TrainingResourceType. - Pserver TrainingResourceType = "PSERVER" - // Trainer is the trainer name of TrainingResourceType. - Trainer TrainingResourceType = "TRAINER" + // MASTER is the master name of TrainingResourceType. + MASTER TrainingResourceType = "master" + // PSERVER is the pserver name of TrainingResourceType. + PSERVER TrainingResourceType = "pserver" + // TRAINER is the trainer name of TrainingResourceType. + TRAINER TrainingResourceType = "trainer" ) // ResourceState is the state of a type of resource diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 4c30fe07..ca07ce37 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -154,7 +154,7 @@ func (a *Autoscaler) totalRunningJob(jobName string) bool { if !ok { return false } - up, ok := v.(*updater.TrainingJobUpdater) + up, ok := v.(*updater.JobUpdater) if !ok { return false } @@ -201,7 +201,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa nodeName := "" // Adjust resource upon return. defer func() { - log.Debug("scaleDryRun", "scaledown", scaleDown, "jobns", j.Namespace, "jobname", j.Name, "additional", additional) + log.Debug("scaleDryRun", "scaledown", scaleDown, "namespace", j.Namespace, "jobname", j.Name, "additional", additional) r.GPULimit += gpuLimit * additional r.CPURequestMilli += cpuRequestMilli * int64(additional) r.MemoryRequestMega += memRequestMega * int64(additional) @@ -219,6 +219,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa plannedInstance := int(*j.Spec.Trainer.ReplicaSpec.Spec.Parallelism) + int(curDiff) instanceMax := j.Spec.Trainer.MaxInstance instanceMin := j.Spec.Trainer.MinInstance + log.Debug("scaleDryRun instance num", "min", instanceMin, "max", instanceMax, "planned", plannedInstance) // TODO(typhoonzero): refine below code to remove direction // ======================= scaleDown ====================== @@ -236,6 +237,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa log.Debug("scaleDryRun", "gpuRequest", r.GPULimit, "threshold", gpuThreshold) log.Debug("scaleDryRun", "cpuRequest", r.CPURequestMilli, "threshold", cpuThreshold) log.Debug("scaleDryRun", "memRequest", r.MemoryRequestMega, "threshold", memThreshold) + log.Debug("scaleDryRun conditions", "gpuCondition", gpuCondition, "cpuCondition", cpuCondition, "memCondition", memCondition) if gpuCondition || cpuCondition || memCondition { if plannedInstance > instanceMin { additional = -1 @@ -297,7 +299,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa func (a *Autoscaler) setAdditional(diff map[string]int32) { a.jobUpdater.Range(func(k, v interface{}) bool { key := k.(string) - up := v.(*updater.TrainingJobUpdater) + up := v.(*updater.JobUpdater) var additional int32 if val, ok := diff[key]; ok { additional = val @@ -349,17 +351,6 @@ func scaleAllJobsDryRun(jobs []*padv1.TrainingJob, r ClusterResource, maxLoadDes return diff } -func (a *Autoscaler) scaleAllJobs() { - a.jobUpdater.Range(func(k, v interface{}) bool { - up := v.(*updater.TrainingJobUpdater) - if up.Additional != 0 { - log.Info("additional of trainingjob", "jobname", k, "scalenum", up.Additional) - up.Scale() - } - return true - }) -} - // Run monitors the cluster resources and training jobs in a loop, // scales the training jobs according to the cluster resource. func (a *Autoscaler) Run() { @@ -381,9 +372,8 @@ func (a *Autoscaler) Run() { a.findTrainingJobsMightBeRescheduled(havePending), r, a.maxLoadDesired) - log.Info("Calculated info", "diff:", diff) + log.Info("Calculated info", "diff", diff) a.setAdditional(diff) - a.scaleAllJobs() } } @@ -395,7 +385,7 @@ func (a *Autoscaler) findPendingJob() bool { log.Debug("findPendingJob check", "jobname", k) total := 0 pending := 0 - up, ok := v.(*updater.TrainingJobUpdater) + up, ok := v.(*updater.JobUpdater) if !ok { log.Debug("findPendingJob conversion error", "jobname", k) } @@ -439,7 +429,7 @@ func (a *Autoscaler) findTrainingJobsMightBeRescheduled(havePending bool) (js tr jn := k.(string) log.Debug("findTrainingJobsMightBeRescheduled", "jobname", jn) - up, ok := v.(*updater.TrainingJobUpdater) + up, ok := v.(*updater.JobUpdater) if !ok { return false } diff --git a/pkg/autoscaler/resource.go b/pkg/autoscaler/resource.go index 44ef1d44..2100456f 100644 --- a/pkg/autoscaler/resource.go +++ b/pkg/autoscaler/resource.go @@ -62,7 +62,7 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) { for _, pod := range podList.Items { podname := pod.Namespace + "/" + pod.Name - log.Debug("updateNodesIdleResource", "podName", podname) + log.Debug("updateNodesIdleResource", "podName", podname, "phase", pod.Status.Phase) nodeName := pod.Spec.NodeName if nodeName == "" { continue diff --git a/pkg/controller/trainingjob_controller.go b/pkg/controller/trainingjob_controller.go index 9cd5daf6..7aba2e50 100644 --- a/pkg/controller/trainingjob_controller.go +++ b/pkg/controller/trainingjob_controller.go @@ -6,10 +6,9 @@ import ( "time" log "github.com/inconshreveable/log15" - corev1 "k8s.io/api/core/v1" - extv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -23,7 +22,7 @@ import ( paddlev1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" "github.com/paddlepaddle/edl/pkg/autoscaler" - paddleclient "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" + paddleclientset "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" paddlescheme "github.com/paddlepaddle/edl/pkg/client/clientset/versioned/scheme" paddleinformers "github.com/paddlepaddle/edl/pkg/client/informers/externalversions" paddlelisters "github.com/paddlepaddle/edl/pkg/client/listers/paddlepaddle/v1" @@ -34,16 +33,16 @@ import ( type TrainingJobController struct { // KubeCli is a standard kubernetes clientset KubeCli kubernetes.Interface - // ExtCli is the extension kubernetes clientset - ExtCli extclient.Interface + // ApiCli is the extension kubernetes clientset + ApiCli apiextensionsclient.Interface // PaddleCli is a clientset for our own API group - PaddleCli paddleclient.Interface + PaddleCli paddleclientset.Interface trainingjobLister paddlelisters.TrainingJobLister trainingjobSynced cache.InformerSynced - // jobupdater is the collection of job updaters for each training job - jobupdater *sync.Map + // jobtracker keeps a map from job full name to its updater + jobtracker *sync.Map // workqueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. @@ -51,52 +50,73 @@ type TrainingJobController struct { // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder + + // autoclean means whether or not cleaning pods after termination. + autoclean bool } -// New returns a trainingjob controller func New( kubeCli kubernetes.Interface, - extCli extclient.Interface, - paddleCli paddleclient.Interface, - paddleInformers paddleinformers.SharedInformerFactory) *TrainingJobController { - trainingjobInformer := paddleInformers.Paddlepaddle().V1().TrainingJobs() - paddlescheme.AddToScheme(scheme.Scheme) + apiCli apiextensionsclient.Interface, + paddleCli paddleclientset.Interface, + tjInformer paddleinformers.SharedInformerFactory, + auto bool) *TrainingJobController { - log.Info("Creating trainingjob event broadcaster") + traingingjobInformer := tjInformer.Paddlepaddle().V1().TrainingJobs() + + paddlescheme.AddToScheme(scheme.Scheme) + log.Debug("Creating trainingjob event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(log.Info) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeCli.CoreV1().Events("")}) - workqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "TrainingJob") recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "TrainingJobController"}) controller := &TrainingJobController{ KubeCli: kubeCli, - ExtCli: extCli, + ApiCli: apiCli, PaddleCli: paddleCli, - trainingjobLister: trainingjobInformer.Lister(), - trainingjobSynced: trainingjobInformer.Informer().HasSynced, - jobupdater: new(sync.Map), - workqueue: workqueue, + trainingjobLister: traingingjobInformer.Lister(), + trainingjobSynced: traingingjobInformer.Informer().HasSynced, + jobtracker: new(sync.Map), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "TrainingJob"), recorder: recorder, + autoclean: auto, } log.Info("Setting up event handlers") - trainingjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueue, - UpdateFunc: func(oldObj, newObj interface{}) { - oldTj := oldObj.(*paddlev1.TrainingJob) - newTj := newObj.(*paddlev1.TrainingJob) - objNs := oldTj.Namespace - objName := oldTj.Name - if oldTj.ResourceVersion == newTj.ResourceVersion { - log.Debug("same resourceversion for training job", objNs, "/", objName, ", skipped") - return - } - log.Debug("resourceversion for training job", objNs, "/", objName, "updated") - controller.enqueue(newObj) - }, - DeleteFunc: controller.dequeue, - }) + traingingjobInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *paddlev1.TrainingJob: + log.Debug("filter trainingjob", "namespace", t.Namespace, "name", t.Name) + return true + default: + return false + } + }, + + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + log.Debug("AddFunc called") + controller.enqueue(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldTj := oldObj.(*paddlev1.TrainingJob) + newTj := newObj.(*paddlev1.TrainingJob) + if oldTj.ResourceVersion == newTj.ResourceVersion { + log.Debug("same resourceversion skipped", "namespace", oldTj.Namespace, "name", oldTj.Name) + return + } + log.Debug("resourceversion updated", "namespace", oldTj.Namespace, "name", oldTj.Name) + controller.enqueue(newObj) + }, + DeleteFunc: func(obj interface{}) { + log.Debug("DeleteFunc called") + controller.enqueue(obj) + }, + }, + }) return controller } @@ -106,9 +126,11 @@ func New( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *TrainingJobController) Run(threadiness int, maxLoadDesired float64, stopCh <-chan struct{}) error { + // TODO add a lock to ensure there is only one controller in the cluster defer runtime.HandleCrash() defer c.workqueue.ShutDown() + log.Info("Starting trainingjob controller") log.Info("Starting to create custom resource definition") if err := c.createCRD(); err != nil { @@ -125,9 +147,12 @@ func (c *TrainingJobController) Run(threadiness int, maxLoadDesired float64, sto go wait.Until(c.runWorker, time.Second, stopCh) } + // gc := NewGarbageCollector(c.KubeCli, c.trainingjobLister) + // go gc.CleanOrphans(10 * time.Minute) + log.Info("Started workers") - as := autoscaler.NewAutoscaler(c.KubeCli, c.jobupdater, autoscaler.WithMaxLoadDesired(maxLoadDesired)) + as := autoscaler.NewAutoscaler(c.KubeCli, c.jobtracker, autoscaler.WithMaxLoadDesired(maxLoadDesired)) as.Run() <-stopCh @@ -137,15 +162,15 @@ func (c *TrainingJobController) Run(threadiness int, maxLoadDesired float64, sto } func (c *TrainingJobController) createCRD() error { - crd := &extv1beta1.CustomResourceDefinition{ + crd := &apiextensionsv1beta1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ Name: paddlev1.CRDName(), }, - Spec: extv1beta1.CustomResourceDefinitionSpec{ + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ Group: paddlev1.CRDGroup, Version: paddlev1.CRDVersion, - Scope: extv1beta1.NamespaceScoped, - Names: extv1beta1.CustomResourceDefinitionNames{ + Scope: apiextensionsv1beta1.NamespaceScoped, + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ Kind: paddlev1.CRDKind, Plural: paddlev1.CRDKindPlural, ShortNames: []string{paddlev1.CRDShortName}, @@ -153,9 +178,9 @@ func (c *TrainingJobController) createCRD() error { }, } - _, err := c.ExtCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + _, err := c.ApiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) if err != nil && !apierrors.IsAlreadyExists(err) { - log.Error("Failed to create crd, error", err.Error()) + log.Error("Failed to create crd", "err", err.Error()) return err } @@ -171,93 +196,113 @@ func (c *TrainingJobController) enqueue(obj interface{}) { runtime.HandleError(err) return } - log.Info("enqueue key:", key) + log.Info("enqueue", "key", key) c.workqueue.AddRateLimited(key) } -func (c *TrainingJobController) dequeue(obj interface{}) { - job, ok := obj.(*paddlev1.TrainingJob) - if !ok { - runtime.HandleError(fmt.Errorf("type conversion error: %+v", obj)) - return - } - key := job.Namespace + "/" + job.Name - log.Info("dequeue key:", key) - jobToDelete, ok := c.jobupdater.Load(key) - if !ok { - log.Warn("unsafe state.", key, "was never created but we received delete event") - return - } - - oldtj := jobToDelete.(*updater.TrainingJobUpdater) - oldtj.Delete() - c.jobupdater.Delete(key) -} - func (c *TrainingJobController) runWorker() { + log.Debug("Run worker again") for c.processNextWorkItem() { } } func (c *TrainingJobController) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() + key, shutdown := c.workqueue.Get() + if shutdown { return false } - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) - var key string - var ok bool - if key, ok = obj.(string); !ok { - c.workqueue.Forget(obj) - runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } + defer c.workqueue.Done(key) - if err := c.syncHandler(key); err != nil { - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + forget, err := c.syncHandler(key.(string)) + if err == nil { + if forget { + c.workqueue.Forget(key) + log.Info("Successfully synced", "key", key.(string)) } - - c.workqueue.Forget(obj) - log.Info("Successfully synced", key) - return nil - }(obj) - - if err != nil { - runtime.HandleError(err) return true } + runtime.HandleError(fmt.Errorf("Error syncing job: %v", err)) + c.workqueue.AddRateLimited(key) + return true } -func (c *TrainingJobController) syncHandler(key string) error { - log.Info("syncHandler, key:", key) +func (c *TrainingJobController) syncHandler(key string) (bool, error) { + log.Info("syncHandler", "key", key) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) - return nil + return false, nil } - log.Info("syncHandler for ", ns, "/", name) - - job, createErr := c.trainingjobLister.TrainingJobs(ns).Get(name) - if createErr != nil { - log.Error("get trainingjob error:", err.Error()) - if apierrors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("trainingjob '%s' in the work queue no longer exists", key)) - return nil + + jobIsDeleted := false + job, getErr := c.trainingjobLister.TrainingJobs(ns).Get(name) + if getErr != nil { + log.Debug("Error fetching TrainingJob", "key", key, "err", getErr.Error()) + if apierrors.IsNotFound(getErr) { + jobIsDeleted = true + } else { + return false, nil } + } else { + log.Debug("TrainingJob fetching status", "namespace", job.Namespace, "name", job.Name, "status", job.Status) + } - return err + var jobUpdater *updater.JobUpdater + jobUpdaterObj, exists := c.jobtracker.Load(key) + + if !exists { + if jobIsDeleted { + log.Debug("key not exist", "key", key) + return true, fmt.Errorf("JobNotExists") + } + log.Debug("TrainingJob new", "namespace", job.Namespace, "name", job.Name) + nj := updater.NewJobUpdater(job, c.KubeCli, c.PaddleCli, c.autoclean) + c.jobtracker.Store(key, nj) + jobUpdater = nj + } else { + var ok bool + jobUpdater, ok = jobUpdaterObj.(*updater.JobUpdater) + if !ok { + log.Debug("Conversion object error", "object", jobUpdaterObj) + return true, fmt.Errorf("ConversionError") + } + + if jobIsDeleted { + // clean job + log.Info("Deleting TrainingJob", "name", jobUpdater.FullName()) + if err := jobUpdater.Delete(); err != nil { + log.Error("Error deleting TrainingJob", "name", jobUpdater.FullName(), "err", err.Error()) + return false, nil + } + log.Info("Finishing deleting TrainingJob", "name", jobUpdater.FullName()) + c.jobtracker.Delete(key) + return true, nil + } + + if jobUpdater.UID() != job.ObjectMeta.UID { + // update job + log.Debug("TrainingJob UID changed", "namespace", job.Namespace, "name", job.Name) + jobUpdater.Update(job) + } } - _, ok := c.jobupdater.Load(key) - if !ok { - log.Info("create new job updater, key:", key) - nj, _ := updater.NewUpdater(job, c.KubeCli, c.PaddleCli) - c.jobupdater.Store(key, nj) + if err := jobUpdater.Reconcile(); err != nil { + log.Error("Error reconciling", "namespace", job.Namespace, "name", job.Name, "err", err.Error()) + return false, err } - return nil + currentPhase := jobUpdater.GetJob().Status.Phase + + if currentPhase == paddlev1.TrainingJobPhaseCreating || + currentPhase == paddlev1.TrainingJobPhaseRunning || + currentPhase == paddlev1.TrainingJobPhaseScaling { + c.workqueue.AddAfter(key, 3*time.Second) + log.Debug("TrainingJob put into workqueue again", "key", key, "current statue phase", currentPhase) + } + + return false, nil } diff --git a/pkg/updater/jobparser.go b/pkg/updater/jobparser.go index 82dc3128..5acd306b 100644 --- a/pkg/updater/jobparser.go +++ b/pkg/updater/jobparser.go @@ -200,7 +200,7 @@ func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container { return &corev1.Container{ Name: "etcd", - Image: "quay.io/coreos/etcd:v3.2.1", + Image: "m3ngyang/etcd:v3.2.1", ImagePullPolicy: imagePullPolicy, // TODO(gongwb): etcd ports? Env: podEnv(job), diff --git a/pkg/updater/trainingJobUpdater.go b/pkg/updater/trainingJobUpdater.go deleted file mode 100644 index 9f40d5ca..00000000 --- a/pkg/updater/trainingJobUpdater.go +++ /dev/null @@ -1,589 +0,0 @@ -package updater - -import ( - "fmt" - "reflect" - "time" - - log "github.com/inconshreveable/log15" - - corev1 "k8s.io/api/core/v1" - "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - - padv1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" - trainingJobClient "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" -) - -const ( - retry = 5 - retryTime = 5 * time.Second - convertedTimerTicker = 10 * time.Second - confirmResourceTicker = 5 * time.Second - eventChLength = 1000 - factor = 0.8 -) - -type trainingJobEventType string - -const ( - trainingJobEventDelete trainingJobEventType = "Delete" - trainingJobEventModify trainingJobEventType = "Modify" - trainingJobEventScale trainingJobEventType = "Scale" -) - -type trainingJobEvent struct { - // pet is the TrainingJobEventType of TrainingJob - pet trainingJobEventType - // The job transfer the information fo job - job *padv1.TrainingJob - // additional is the num to scale - additional int32 -} - -// TrainingJobUpdater is used to manage a specific TrainingJob -type TrainingJobUpdater struct { - // Job is the job the TrainingJob manager. - Job *padv1.TrainingJob - - // kubeClient is standard kubernetes client. - KubeClient kubernetes.Interface - - // TrainingJobClient is the client of TrainingJob. - TrainingJobClient trainingJobClient.Interface - - // Status is the status in memory, update when TrainingJob status changed and update the CRD resource status. - Status padv1.TrainingJobStatus - - // EventCh receives events from the controller, include Modify and Delete. - // When trainingJobEvent is Delete it will delete all resources - // The capacity is 1000. - EventCh chan *trainingJobEvent - - // Additional is the num scale. - Additional int32 -} - -// NewUpdater creates a new TrainingJobUpdater and start a goroutine to control current job. -func NewUpdater(job *padv1.TrainingJob, kubeClient kubernetes.Interface, trainingJobClient trainingJobClient.Interface) (*TrainingJobUpdater, error) { - log.Info("NewUpdater", "namespace", job.Namespace, "name", job.Name) - updater := &TrainingJobUpdater{ - Job: job, - KubeClient: kubeClient, - TrainingJobClient: trainingJobClient, - Status: job.Status, - EventCh: make(chan *trainingJobEvent, eventChLength), - } - go updater.start() - return updater, nil -} - -// Notify is used to receive event from controller. While controller receive a informer, -// it will notify updater to process the event. It send event to updater's eventCh. -func (updater *TrainingJobUpdater) notify(te *trainingJobEvent) { - updater.EventCh <- te - lene, cape := len(updater.EventCh), cap(updater.EventCh) - if lene > int(float64(cape)*factor) { - log.Warn("event capacity warning", "eventChannel", updater.Job.Name, "length", lene) - } -} - -// Delete send a delete event to updater, updater will kill the trainingjob and clear all the resource of the -// trainingjob. -func (updater *TrainingJobUpdater) Delete() { - updater.notify(&trainingJobEvent{pet: trainingJobEventDelete}) -} - -// Modify send a modify event to updater, updater will processing according to the situation. -func (updater *TrainingJobUpdater) Modify(nj *padv1.TrainingJob) { - updater.notify(&trainingJobEvent{pet: trainingJobEventModify, job: nj}) -} - -// Scale send a scale event to updater, updater will scale the job to desire replicas. -func (updater *TrainingJobUpdater) Scale() { - updater.notify(&trainingJobEvent{pet: trainingJobEventScale}) -} - -// TODO(m3ngyang) refactor update process -func (updater *TrainingJobUpdater) releaseResource(tp padv1.TrainingResourceType) error { - resource := new(v1beta1.ReplicaSet) - switch tp { - case padv1.Master: - resource = updater.Job.Spec.Master.ReplicaSpec - case padv1.Pserver: - resource = updater.Job.Spec.Pserver.ReplicaSpec - default: - return fmt.Errorf("unknow resource") - } - var replica int32 - resource.Spec.Replicas = &replica - _, err := updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Update(resource) - if errors.IsNotFound(err) { - return err - } - key := "paddle-job-" + tp - - labels := Labels(map[string]string{ - string(key): updater.Job.Name, - }) - - selector, _ := labels.LabelsParser() - options := metav1.ListOptions{ - LabelSelector: selector, - } - - for j := 0; j <= retry; j++ { - time.Sleep(confirmResourceTicker) - pl, err := updater.KubeClient.CoreV1().Pods(updater.Job.Namespace).List(options) - if err == nil && len(pl.Items) == 0 { - return nil - } - } - return updater.KubeClient.CoreV1().Pods(updater.Job.Namespace).DeleteCollection(&metav1.DeleteOptions{}, options) -} - -func (updater *TrainingJobUpdater) releaseMaster() error { - return updater.releaseResource(padv1.Master) -} - -func (updater *TrainingJobUpdater) releasePserver() error { - return updater.releaseResource(padv1.Pserver) -} - -func (updater *TrainingJobUpdater) releaseTrainer() error { - labels := Labels(map[string]string{ - "paddle-job": updater.Job.Name, - }) - selector, _ := labels.LabelsParser() - options := metav1.ListOptions{ - LabelSelector: selector, - } - - return updater.KubeClient.CoreV1().Pods(updater.Job.Namespace).DeleteCollection(&metav1.DeleteOptions{}, options) -} - -func (updater *TrainingJobUpdater) deleteTrainingJob() error { - fault := false - log.Info("Start to delete TrainingJob", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - if updater.Job.Spec.FaultTolerant { - log.Info("Releasing master", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Master.ReplicaSpec.Name) - if err := updater.releaseMaster(); err != nil { - log.Error(err.Error()) - fault = true - } - } - - log.Info("Releasing pserver", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Pserver.ReplicaSpec.Name) - if err := updater.releasePserver(); err != nil { - log.Error(err.Error()) - fault = true - } - - if updater.Job.Spec.FaultTolerant { - log.Info("Deleting TrainingJob master metadata", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Master.ReplicaSpec.Name) - if err := updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Delete(updater.Job.Spec.Master.ReplicaSpec.Name, &metav1.DeleteOptions{}); err != nil { - if errors.IsNotFound(err) { - log.Error("Error deleting master replicaset", "error", err.Error()) - fault = true - } - } - } - - log.Info("Deleting TrainingJob pserver metadata", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Pserver.ReplicaSpec.Name) - if err := updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Delete(updater.Job.Spec.Pserver.ReplicaSpec.Name, &metav1.DeleteOptions{}); err != nil { - if !errors.IsNotFound(err) { - log.Error("Error deleting pserver replicaset", "error", err.Error()) - fault = true - } - } - - log.Info("Deletging TrainingJob trainer metadata", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Trainer.ReplicaSpec.Name) - if err := updater.KubeClient.BatchV1().Jobs(updater.Job.Namespace).Delete(updater.Job.Spec.Trainer.ReplicaSpec.Name, &metav1.DeleteOptions{}); err != nil { - if !errors.IsNotFound(err) { - log.Error("Error deleting trainer replicaset", err.Error()) - fault = true - } - } - - log.Info("Releasing TrainingJob trainer", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Trainer.ReplicaSpec.Name) - if err := updater.releaseTrainer(); err != nil { - log.Error("Error releasing trainer", err.Error()) - fault = true - } - - log.Info("End to delete TrainingJob", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - - if fault { - return fmt.Errorf("delete resource error namespace=%v name=%v", updater.Job.Namespace, updater.Job.Name) - } - return nil -} - -func (updater *TrainingJobUpdater) createResource(tp padv1.TrainingResourceType) error { - resource := new(v1beta1.ReplicaSet) - switch tp { - case padv1.Master: - resource = updater.Job.Spec.Master.ReplicaSpec - case padv1.Pserver: - resource = updater.Job.Spec.Pserver.ReplicaSpec - default: - return fmt.Errorf("unknown resource") - } - for { - _, err := updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Get(resource.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - log.Info("TrainingJob Not Found, start to create", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "resourceName", resource.Name) - _, err = updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Create(resource) - if err != nil { - updater.Status.Phase = padv1.TrainingJobPhaseFailed - updater.Status.Reason = "Internal error; create resource error:" + err.Error() - return err - } - } else if err != nil { - log.Error("failed to get resource", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "resourceName", resource.Name, "error", err.Error()) - time.Sleep(retryTime) - continue - } - ticker := time.NewTicker(confirmResourceTicker) - defer ticker.Stop() - for v := range ticker.C { - rs, err := updater.KubeClient.ExtensionsV1beta1().ReplicaSets(updater.Job.Namespace).Get(resource.Name, metav1.GetOptions{}) - log.Info("current status", "time", v.String(), "resourceName", resource.Name, "runningPodNum", rs.Status.ReadyReplicas) - if err != nil && !errors.IsServerTimeout(err) && !errors.IsTooManyRequests(err) { - updater.Status.Reason = "Internal error; create resource error:" + err.Error() - return err - } - if errors.IsServerTimeout(err) || errors.IsTooManyRequests(err) { - log.Warn("failed to connect to kubernetes", "error", err.Error()) - continue - } - if *resource.Spec.Replicas == 0 { - return fmt.Errorf("trainingjob is deleting, namespace=%v name=%v ", updater.Job.Namespace, updater.Job.Name) - - } - if rs.Status.ReadyReplicas == *resource.Spec.Replicas { - log.Info("create resource done", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "resourceName", resource.Name) - return nil - } - } - } -} - -func (updater *TrainingJobUpdater) createTrainer() error { - resource := updater.Job.Spec.Trainer.ReplicaSpec - for { - _, err := updater.KubeClient.BatchV1().Jobs(updater.Job.Namespace).Get(resource.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - log.Info("TrainingJob Not Found, start to create", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - _, err = updater.KubeClient.BatchV1().Jobs(updater.Job.Namespace).Create(resource) - if err != nil { - updater.Status.Phase = padv1.TrainingJobPhaseFailed - updater.Status.Reason = "Internal error; create trainer error:" + err.Error() - return err - } - } else if err != nil { - log.Error("failed to get resource", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "resourceName", resource.Name, "error", err.Error()) - time.Sleep(retryTime) - continue - } - updater.Status.Phase = padv1.TrainingJobPhaseRunning - updater.Status.Reason = "" - return nil - } -} - -func (updater *TrainingJobUpdater) createTrainingJob() error { - if updater.Job.Spec.FaultTolerant { - if err := updater.createResource(padv1.Master); err != nil { - return err - } - } - if err := updater.createResource(padv1.Pserver); err != nil { - return err - } - return updater.createTrainer() -} - -func (updater *TrainingJobUpdater) updateCRDStatus() error { - if reflect.DeepEqual(updater.Status, updater.Job.Status) { - return nil - } - newTrainingJob := updater.Job - newTrainingJob.Status = updater.Status - newTrainingJob, err := updater.TrainingJobClient.PaddlepaddleV1().TrainingJobs(updater.Job.Namespace).Update(newTrainingJob) - if err != nil { - return err - } - updater.Job = newTrainingJob - return nil -} - -// parseTrainingJob validates the fields and parses the TrainingJob -func (updater *TrainingJobUpdater) parseTrainingJob() error { - var parser DefaultJobParser - var createErr error - updater.Job, createErr = parser.NewTrainingJob(updater.Job) - - if createErr != nil { - updater.Status.Phase = padv1.TrainingJobPhaseFailed - updater.Status.Reason = createErr.Error() - return createErr - } - - updater.Status.Phase = padv1.TrainingJobPhaseCreating - updater.Status.Reason = "" - return nil -} - -func (updater *TrainingJobUpdater) getTrainerReplicaStatuses() ([]*padv1.TrainingResourceStatus, error) { - var replicaStatuses []*padv1.TrainingResourceStatus - trs := padv1.TrainingResourceStatus{ - TrainingResourceType: padv1.Trainer, - State: padv1.ResourceStateNone, - ResourceStates: make(map[padv1.ResourceState]int), - } - // TODO(ZhengQi): get detail status in future - replicaStatuses = append(replicaStatuses, &trs) - return replicaStatuses, nil -} - -// GetStatus get TrainingJob status from trainers. -func (updater *TrainingJobUpdater) GetStatus() (*padv1.TrainingJobStatus, error) { - status := updater.Status - j, err := updater.KubeClient.BatchV1().Jobs(updater.Job.Namespace). - Get(updater.Job.Spec.Trainer.ReplicaSpec.Name, metav1.GetOptions{}) - if err != nil { - log.Error("get trainer error:", err.Error()) - return &status, err - } - - status.ReplicaStatuses, err = updater.getTrainerReplicaStatuses() - if err != nil { - log.Error("get trainer replica status error:", err.Error()) - } - - if updater.Job.Spec.FaultTolerant { - // TODO(ZhengQi): should to confirm when job done - if j.Status.Failed == *updater.Job.Spec.Trainer.ReplicaSpec.Spec.Parallelism { - status.Phase = padv1.TrainingJobPhaseFailed - status.Reason = "all trainer have failed!" - } else { - if j.Status.Succeeded != 0 && j.Status.Active == 0 { - status.Phase = padv1.TrainingJobPhaseSucceeded - status.Reason = "Success!" - } - } - } else { - if j.Status.Failed != 0 { - status.Phase = padv1.TrainingJobPhaseFailed - status.Reason = "at least one trainer failed!" - } else { - if j.Status.Succeeded == *updater.Job.Spec.Trainer.ReplicaSpec.Spec.Parallelism && j.Status.Active == 0 { - status.Phase = padv1.TrainingJobPhaseSucceeded - status.Reason = "all trainer have succeeded!" - } - } - } - return &status, nil -} - -// Convert is main process to convert TrainingJob to desire status. -func (updater *TrainingJobUpdater) Convert() { - jobNs := updater.Job.Namespace - jobName := updater.Job.Name - log.Info("convert status", "namespace", jobNs, "name", jobName) - - if updater.Status.Phase == padv1.TrainingJobPhaseRunning || updater.Status.Phase == padv1.TrainingJobPhaseScaling { - status, err := updater.GetStatus() - if err != nil { - log.Error("failed to get current status", "namespace", jobNs, "name", jobName) - return - } - updater.Status = *status.DeepCopy() - log.Info("Current status", "namespace", jobNs, "name", jobName, "status", status) - err = updater.updateCRDStatus() - if err != nil { - log.Error("failed to update current status", "namespace", jobNs, "name", jobName, "error", err.Error()) - } - if updater.Status.Phase == padv1.TrainingJobPhaseSucceeded || updater.Status.Phase == padv1.TrainingJobPhaseFailed { - log.Info("Releasing Resource", "namespace", jobNs, "name", jobName) - if updater.Job.Spec.FaultTolerant { - log.Info("Release master", "namespace", jobNs, "name", updater.Job.Spec.Master.ReplicaSpec.Name) - if err := updater.releaseMaster(); err != nil { - log.Error(err.Error()) - } - } - log.Info("Release pserver", "namespace", jobNs, "name", updater.Job.Spec.Pserver.ReplicaSpec.Name) - if err := updater.releasePserver(); err != nil { - log.Error(err.Error()) - } - } - } -} - -// InitResource is used to parse trainingJob and create trainingJob resources. -func (updater *TrainingJobUpdater) InitResource() { - if updater.Status.Phase == padv1.TrainingJobPhaseNone { - log.Info("set up trainingJob", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - // TODO mengyang - updater.parseTrainingJob() - err := updater.updateCRDStatus() - if err != nil { - log.Error("failed to set up trainingJob to update trainingJob status", "error", err.Error()) - } - } - - if updater.Status.Phase == padv1.TrainingJobPhaseCreating { - log.Info("creating trainingJob", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - _ = updater.createTrainingJob() - err := updater.updateCRDStatus() - if err != nil { - log.Error("faield to update trainingJob status", "error", err.Error()) - } - if updater.Status.Phase == padv1.TrainingJobPhaseFailed { - log.Info("failed to release resource", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - if updater.Job.Spec.FaultTolerant { - log.Info("releasing master", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Master.ReplicaSpec.Name) - if err := updater.releaseMaster(); err != nil { - log.Error(err.Error()) - } - } - - log.Info("releasing pserver", "namespace", updater.Job.Namespace, "name", updater.Job.Spec.Pserver.ReplicaSpec.Name) - if err := updater.releasePserver(); err != nil { - log.Error(err.Error()) - } - } - } -} - -func (updater *TrainingJobUpdater) scale(additional int32) *padv1.TrainerJobScaleRecord { - // TODO(m3ngyang) use events to record scale info - scaleRecord := &padv1.TrainerJobScaleRecord{ - ScaleTimestamp: metav1.NewTime(time.Now()), - Additional: additional, - } - jobNs := updater.Job.Namespace - jobName := updater.Job.Spec.Trainer.ReplicaSpec.Name - jobSpec, err := updater.KubeClient.BatchV1().Jobs(jobNs).Get(jobName, metav1.GetOptions{}) - if err != nil { - log.Error("failed to get current job status", "namespace", jobNs, "name", jobName, "error", err.Error()) - scaleRecord.Status = padv1.ScaleFalse - updater.Status.ScaleRecords.ScaleRecords = append(updater.Status.ScaleRecords.ScaleRecords, scaleRecord) - return scaleRecord - } - - newParallelism := *jobSpec.Spec.Parallelism + additional - // scaledown will cause pods terminiated with error code - newBackoffLimit := *jobSpec.Spec.BackoffLimit - if additional < 0 { - newBackoffLimit -= additional - } - jobSpec.Spec.Parallelism = &newParallelism - jobSpec.Spec.BackoffLimit = &newBackoffLimit - - log.Debug("scaling job", "namespace", jobNs, "name", jobName, "newSpec", jobSpec) - _, err = updater.KubeClient.BatchV1().Jobs(jobNs).Update(jobSpec) - if err != nil { - log.Error("failed to scale job", "namespace", jobNs, "name", jobName, "error", err.Error()) - scaleRecord.Status = padv1.ScaleFalse - scaleRecord.Reason = err.Error() - } else { - log.Info("successful to scale job", "namespace", jobNs, "name", jobName) - updater.Job.Spec.Trainer.ReplicaSpec.Spec.Parallelism = jobSpec.Spec.Parallelism - scaleRecord.Status = padv1.ScaleTrue - scaleRecord.Reason = "" - updater.Status.Phase = padv1.TrainingJobPhaseScaling - } - updater.Status.ScaleRecords.ScaleRecords = append(updater.Status.ScaleRecords.ScaleRecords, scaleRecord) - return scaleRecord -} - -func (updater *TrainingJobUpdater) syncScale() { - for { - STATUSCHECK: - log.Debug("sync scale status", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "status", updater.Job.Status.Phase) - if updater.Status.Phase == padv1.TrainingJobPhaseSucceeded || updater.Status.Phase == padv1.TrainingJobPhaseFailed { - log.Info("Omit sync scale for job", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - return - } - - pods, err := updater.KubeClient.CoreV1().Pods(updater.Job.Namespace).List(metav1.ListOptions{LabelSelector: "paddle-job=" + updater.Job.Name}) - if err != nil { - log.Error("failed to list pods", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "error", err.Error()) - } - for _, pod := range pods.Items { - log.Debug("check pod status", "namespace", pod.Namespace, "pod", pod.Name, "status", pod.Status.Phase) - if pod.Status.Phase != corev1.PodRunning { - time.Sleep(time.Second) - goto STATUSCHECK - } - } - - log.Info("TrainingJob running now", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - updater.Status.Phase = padv1.TrainingJobPhaseRunning - if err := updater.updateCRDStatus(); err != nil { - log.Error("failed to update trainingJob status", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "error", err.Error()) - } - return - - } -} - -// scaleTrainingJob scale job up or down -func (updater *TrainingJobUpdater) scaleTrainingJob(additional int32) { - // The scale action will be omit if job have done. - if updater.Status.Phase == padv1.TrainingJobPhaseSucceeded || updater.Status.Phase == padv1.TrainingJobPhaseFailed { - log.Info("Omit scale for job have done", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "additional", additional) - return - } - - // Scale job - scaleRecord := updater.scale(additional) - err := updater.updateCRDStatus() - if err != nil { - log.Warn("failed to scale trainingJob to update trainingJob status", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "error", err.Error()) - } - - if scaleRecord.Status == padv1.ScaleTrue { - // Sync scale - go updater.syncScale() - } -} - -// Start is the main process of life cycle of a TrainingJob, including create resources, event process handle and -// status convert. -func (updater *TrainingJobUpdater) start() { - log.Info("start updater", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - go updater.InitResource() - - ticker := time.NewTicker(convertedTimerTicker) - defer ticker.Stop() - log.Info("start ticker", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - for { - select { - case ev := <-updater.EventCh: - switch ev.pet { - case trainingJobEventDelete: - log.Info("deleting updater", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - if err := updater.deleteTrainingJob(); err != nil { - log.Error("failed to deleting updater", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "error", err.Error()) - } - return - case trainingJobEventScale: - log.Info("scaling job", "namespace", updater.Job.Namespace, "name", updater.Job.Name, "additional", updater.Additional) - updater.scaleTrainingJob(updater.Additional) - } - case <-ticker.C: - updater.Convert() - if updater.Status.Phase == padv1.TrainingJobPhaseSucceeded || updater.Status.Phase == padv1.TrainingJobPhaseFailed { - if ticker != nil { - log.Info("stopping ticker", "namespace", updater.Job.Namespace, "name", updater.Job.Name) - ticker.Stop() - } - } - } - } -} diff --git a/pkg/updater/trainingjob_updater.go b/pkg/updater/trainingjob_updater.go new file mode 100644 index 00000000..4d8bd5e6 --- /dev/null +++ b/pkg/updater/trainingjob_updater.go @@ -0,0 +1,653 @@ +package updater + +import ( + "errors" + "fmt" + "reflect" + + log "github.com/inconshreveable/log15" + corev1 "k8s.io/api/core/v1" + "k8s.io/api/extensions/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + + paddlev1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1" + trainingJobClient "github.com/paddlepaddle/edl/pkg/client/clientset/versioned" + "github.com/paddlepaddle/edl/pkg/client/clientset/versioned/scheme" +) + +var ( + // ErrorUnkownResourceType not supported resource + ErrorUnkownResourceType = errors.New("UnknownResourceType") +) + +// JobUpdater controls the life circle of one TrainingJob instance +type JobUpdater struct { + Job *paddlev1.TrainingJob + kubeCli kubernetes.Interface + trainingJobCli trainingJobClient.Interface + status paddlev1.TrainingJobStatus + recorder record.EventRecorder + autoclean bool + Additional int32 +} + +// NewJobUpdater returns JobUpdater instance +func NewJobUpdater(job *paddlev1.TrainingJob, kubeCli kubernetes.Interface, jobCli trainingJobClient.Interface, auto bool) *JobUpdater { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.Info) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeCli.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "TrainingJobController"}) + + return &JobUpdater{ + Job: job, + kubeCli: kubeCli, + trainingJobCli: jobCli, + status: *job.Status.DeepCopy(), + recorder: recorder, + autoclean: auto, + } +} + +// UID return uid of a job instance +func (j *JobUpdater) UID() types.UID { + return j.Job.ObjectMeta.UID +} + +// Update updates jobupdater's job instance +func (j *JobUpdater) Update(job *paddlev1.TrainingJob) { + log.Debug("Updating", "job", j.FullName(), "statue", job.Status) + j.Job = job +} + +// GetJob returns trainingjob instance +func (j *JobUpdater) GetJob() *paddlev1.TrainingJob { + return j.Job +} + +// Delete deletes trainingjob instance +func (j *JobUpdater) Delete() error { + return j.deleteTrainingJob() +} + +// FullName returns job's namespace and name +func (j *JobUpdater) FullName() string { + return fmt.Sprintf("%s/%s", j.Job.Namespace, j.Job.Name) +} + +func (j *JobUpdater) masterName() string { + return fmt.Sprintf("%s/%s", j.Job.Namespace, j.Job.Spec.Master.ReplicaSpec.Name) +} + +func (j *JobUpdater) pserverName() string { + return fmt.Sprintf("%s/%s", j.Job.Namespace, j.Job.Spec.Pserver.ReplicaSpec.Name) +} + +func (j *JobUpdater) trainerName() string { + return fmt.Sprintf("%s/%s", j.Job.Namespace, j.Job.Spec.Trainer.ReplicaSpec.Name) +} + +// Reconcile tries to get the job into the desired state +func (j *JobUpdater) Reconcile() error { + log.Info("Reconciling TrainingJob", "job", j.FullName(), "current status phase", j.Job.Status.Phase) + + if j.Job.ObjectMeta.DeletionTimestamp != nil { + log.Info("Deleted timestamp", "job", j.FullName(), "timestamp", j.Job.ObjectMeta.DeletionTimestamp.String()) + return nil + } + + if j.Job.Status.Phase == paddlev1.TrainingJobPhaseNone { + log.Info("Setting up", "job", j.FullName()) + if err := j.setup(); err != nil { + j.status.Phase = paddlev1.ResourceStateFailed + j.status.Reason = err.Error() + log.Error("Error setting up TrainingJob", "job", j.FullName(), "err", err.Error()) + } else { + j.status.Phase = paddlev1.TrainingJobPhaseCreating + log.Info("Finish setting up TrainingJob", "job", j.FullName()) + } + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + } + + if j.Job.Status.Phase == paddlev1.TrainingJobPhaseCreating { + log.Info("Creating TrainingJob", "job", j.FullName()) + if err := j.createTrainingJob(); err != nil { + log.Error("Error creating TrainingJob", "job", j.FullName(), "err", err.Error()) + j.status.Phase = paddlev1.ResourceStateFailed + j.status.Reason = err.Error() + } else { + log.Info("Finish creating TrainingJob", "job", j.FullName()) + } + + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + + phase, reason, err := j.GetStatus() + log.Info("Error creating TrainingJob", "job", j.FullName(), "current phase", phase, "reason", reason, "err", err) + if err != nil { + log.Error("Error get TrainingJob status", "job", j.FullName(), "err", err.Error()) + return err + } + + j.status.Phase = phase + j.status.Reason = reason + + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + } + + if j.Job.Status.Phase == paddlev1.TrainingJobPhaseRunning { + phase, reason, err := j.GetStatus() + if err != nil { + log.Error("Error get TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + + j.status.Phase = phase + j.status.Reason = reason + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + } + + if j.Job.Status.Phase == paddlev1.TrainingJobPhaseScaling { + if j.Additional != 0 { + if err := j.scale(); err != nil { + //TODO + return err + } + j.Additional = 0 + } + + phase, reason, err := j.GetStatus() + if err != nil { + log.Error("Error get TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + + j.status.Phase = phase + j.status.Reason = reason + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + } + + if j.Job.Status.Phase == paddlev1.TrainingJobPhaseSucceeded || + j.Job.Status.Phase == paddlev1.TrainingJobPhaseFailed { + if j.autoclean { + log.Info("Releasing TrainingJob resource", "job", j.FullName(), "current status phase", j.Job.Status.Phase) + if err := j.releaseTrainer(); err != nil { + log.Error("Error releasing TrainingJob trainer resource", "job", j.FullName(), "err", err.Error()) + return err + } + log.Info("Finish releasing TrainingJob trainer resource", "job", j.FullName()) + + if err := j.releaseMasterRoles(); err != nil { + log.Error("Error releasing TrainingJob master/pserver resource", "job", j.FullName(), "err", err.Error()) + return err + } + log.Info("Finish releasing TrainingJob master/pserver resource", "job", j.FullName()) + + j.recorder.Event(j.Job, corev1.EventTypeNormal, "Terminated", "All pods cleaned") + } else { + j.recorder.Event(j.Job, corev1.EventTypeNormal, "Terminated", "All pods kept") + } + } + + if err := j.updateCRDStatus(); err != nil { + log.Error("Error updating TrainingJob", "job", j.FullName(), "err", err.Error()) + return err + } + + return nil +} + +func (j *JobUpdater) setup() error { + var parser DefaultJobParser + var err error + j.Job, err = parser.NewTrainingJob(j.Job) + if err != nil { + log.Error("error settting up", "job", j.FullName(), "err", err.Error()) + } + + return err +} + +func (j *JobUpdater) updateCRDStatus() error { + log.Debug("updating TrainingJob status", "job", j.FullName(), "former status", j.Job.Status, "current status", j.status) + if reflect.DeepEqual(j.status, j.Job.Status) { + log.Debug("update TrainingJob skipped", "job", j.FullName(), "status", j.status) + return nil + } + + newJob := j.Job + newJob.Status = j.status + // sync trainingjob to apiserver + newJob, err := j.trainingJobCli.PaddlepaddleV1().TrainingJobs(j.Job.Namespace).Update(newJob) + if err != nil { + return err + } + + j.Job = newJob + return nil +} + +// GetStatus get current status phase and reasion of job +func (j *JobUpdater) GetStatus() (paddlev1.TrainingJobPhase, string, error) { + phase := j.status.Phase + reason := "" + + trainers, err := j.kubeCli.BatchV1().Jobs(j.Job.Namespace).Get(j.Job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{}) + if err != nil { + log.Error("error getting trainers", "name", j.trainerName(), "err", err.Error()) + return phase, reason, err + } + + // total running + totalRunning, err := j.jobTotalRunning() + if err != nil { + return phase, reason, err + } else if totalRunning { + phase = paddlev1.TrainingJobPhaseRunning + reason = "all pods are running" + } + + // the parallelism of batch/job trainer will be modified after success/failure + total := *j.Job.Spec.Trainer.ReplicaSpec.Spec.Parallelism + if j.Job.Spec.FaultTolerant { + if trainers.Status.Failed == total { + phase = paddlev1.TrainingJobPhaseFailed + reason = "all trainer instances have failed" + return phase, reason, nil + } else if trainers.Status.Succeeded == total && trainers.Status.Active == 0 { + phase = paddlev1.TrainingJobPhaseSucceeded + reason = "all trainer instances have done" + return phase, reason, nil + } + } else { + if trainers.Status.Failed != 0 { + failedPods, err := j.findFailedTrainerPods() + if err != nil { + return phase, reason, err + } + + podNameList := make([]string, 0) + podNodeList := make([]string, 0) + podReasonList := make([]string, 0) + for _, pod := range failedPods { + podNameList = append(podNameList, pod.Name) + podNodeList = append(podNodeList, pod.Status.HostIP) + podReasonList = append(podReasonList, pod.Status.Reason) + } + + phase = paddlev1.TrainingJobPhaseFailed + reason = fmt.Sprintf("trainer instances %s on %s have failed", podNameList, podNodeList) + podFailReason := fmt.Sprintf("trainer instances %s on %s have failed, detailed reasons: %s", podNameList, + podNodeList, podReasonList) + j.recorder.Event(j.Job, corev1.EventTypeWarning, "Pods Failed", podFailReason) + return phase, reason, nil + } else if trainers.Status.Succeeded == total && trainers.Status.Active == 0 { + phase = paddlev1.TrainingJobPhaseSucceeded + reason = "all trainer instances have done" + return phase, reason, nil + } + } + + if j.Additional != 0 { + phase = paddlev1.TrainingJobPhaseScaling + reason = fmt.Sprintf("need scale") + } + + return phase, reason, nil +} + +func (j *JobUpdater) createTrainingJob() error { + if j.Job.Spec.FaultTolerant { + log.Debug("creating master", "name", j.masterName()) + if err := j.createResource(paddlev1.MASTER); err != nil { + return err + } + } + + log.Debug("creatint pserver", "name", j.pserverName()) + if err := j.createResource(paddlev1.PSERVER); err != nil { + return err + } + + log.Debug("creating trainer", "name", j.trainerName()) + if err := j.createTrainer(); err != nil { + return err + } + + return nil +} + +func (j *JobUpdater) createResource(rt paddlev1.TrainingResourceType) error { + resource := new(v1beta1.ReplicaSet) + switch rt { + case paddlev1.MASTER: + resource = j.Job.Spec.Master.ReplicaSpec + case paddlev1.PSERVER: + resource = j.Job.Spec.Pserver.ReplicaSpec + default: + return ErrorUnkownResourceType + } + + if _, err := j.kubeCli.ExtensionsV1beta1().ReplicaSets(resource.Namespace).Get(resource.Name, v1.GetOptions{}); err != nil { + if apierrors.IsNotFound(err) { + if _, err := j.kubeCli.ExtensionsV1beta1().ReplicaSets(resource.Namespace).Create(resource); err != nil { + log.Error("error creating resource", "namespace", resource.Namespace, "name", resource.Name, "err", err.Error()) + return err + } + log.Debug("finish creating resource", "namespace", resource.Namespace, "name", resource.Name) + return nil + } + log.Error("error getting resource", "namespace", resource.Namespace, "name", resource.Name, "err", err.Error()) + return err + } + + log.Debug("resource already existing, skipping", "namespace", resource.Namespace, "name", resource.Name) + return nil +} + +func (j *JobUpdater) createTrainer() error { + if _, err := j.kubeCli.BatchV1().Jobs(j.Job.Namespace).Get(j.Job.Spec.Trainer.ReplicaSpec.Name, v1.GetOptions{}); err != nil { + if apierrors.IsNotFound(err) { + if _, err = j.kubeCli.BatchV1().Jobs(j.Job.Namespace).Create(j.Job.Spec.Trainer.ReplicaSpec); err != nil { + log.Error("error creating trainer", "name", j.trainerName(), "err", err.Error()) + return err + } + log.Debug("finishing creating trainer", "name", j.trainerName()) + return nil + } + log.Error("error getting trainer", "name", j.trainerName(), "err", err.Error()) + return err + } + + log.Debug("trainer already existing skipping", "name", j.trainerName()) + return nil +} + +func (j *JobUpdater) deleteTrainingJob() error { + if j.Job.Spec.FaultTolerant { + log.Debug("deleting master", "name", j.masterName()) + if err := j.deleteResource(paddlev1.MASTER); err != nil { + log.Error("error deleting master", "name", j.masterName(), "err", err.Error()) + return err + } + } + + log.Debug("deleting pserver", "name", j.pserverName()) + if err := j.deleteResource(paddlev1.PSERVER); err != nil { + log.Error("error deleting: %s, err: %s", j.pserverName(), err.Error()) + return err + } + + log.Debug("deleting trainer", "name", j.trainerName()) + if err := j.deleteTrainer(); err != nil { + log.Error("error deleting trainer", "name", j.trainerName(), "err", err.Error()) + return err + } + + return nil +} + +func (j *JobUpdater) deleteResource(rt paddlev1.TrainingResourceType) error { + if err := j.releaseResource(rt); err != nil { + return err + } + + resourceName := j.Job.Name + "-" + string(rt) + if err := j.kubeCli.ExtensionsV1beta1().ReplicaSets(j.Job.Namespace).Delete(resourceName, &v1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + log.Debug("resource not found, skipped", "namespace", j.Job.Namespace, "name", resourceName) + return nil + } + return err + } + log.Debug("finishing releasing", "namespace", j.Job.Namespace, "name", resourceName) + return nil +} + +func (j *JobUpdater) deleteTrainer() error { + if err := j.releaseTrainer(); err != nil { + return err + } + + if err := j.kubeCli.BatchV1().Jobs(j.Job.Namespace).Delete(j.Job.Spec.Trainer.ReplicaSpec.Name, &v1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + log.Debug("trainer not exist skipped", "name", j.trainerName()) + return nil + } + return err + } + log.Debug("finishing deleting trainer", "name", j.trainerName()) + return nil +} + +func (j *JobUpdater) releaseMasterRoles() error { + if j.Job.Spec.FaultTolerant { + if err := j.releaseResource(paddlev1.MASTER); err != nil { + log.Error("error releasing master", "name", j.masterName(), "err", err) + return err + } + } + + if err := j.releaseResource(paddlev1.PSERVER); err != nil { + log.Error("error releasing pserver", "name", j.pserverName(), "err", err) + return err + } + + return nil +} + +func (j *JobUpdater) releaseResource(rt paddlev1.TrainingResourceType) error { + resourceName := "" + switch rt { + case paddlev1.MASTER: + resourceName = j.Job.Spec.Master.ReplicaSpec.Name + case paddlev1.PSERVER: + resourceName = j.Job.Spec.Pserver.ReplicaSpec.Name + default: + return ErrorUnkownResourceType + } + + resource, getErr := j.kubeCli.ExtensionsV1beta1().ReplicaSets(j.Job.Namespace).Get(resourceName, v1.GetOptions{}) + if getErr != nil { + if apierrors.IsNotFound(getErr) { + log.Debug("resouce instance not exist, skipped", "namespace", j.Job.Namespace, "name", resourceName) + return nil + } + log.Error("error getting instance", "namespace", j.Job.Namespace, "name", resourceName, "err", getErr) + return getErr + } + + if *resource.Spec.Replicas != 0 { + var replicas int32 + replicas = 0 + resource.Spec.Replicas = &replicas + if _, err := j.kubeCli.ExtensionsV1beta1().ReplicaSets(j.Job.Namespace).Update(resource); err != nil { + log.Error("error setting replicas to 0", "namespace", j.Job.Namespace, "name", resourceName, "err", err.Error()) + return err + } + } + + if resource.Status.FullyLabeledReplicas != 0 { + key := "paddle-job-" + rt + labels := Labels(map[string]string{ + string(key): j.Job.Name, + }) + + selector, _ := labels.LabelsParser() + options := v1.ListOptions{ + LabelSelector: selector, + } + + if err := j.kubeCli.CoreV1().Pods(j.Job.Namespace).DeleteCollection(&v1.DeleteOptions{}, options); err != nil { + log.Error("error deleting resource pods", "namespace", j.Job.Namespace, "name", resourceName, "err", err.Error()) + return err + } + } + + return nil +} + +func (j *JobUpdater) releaseTrainer() error { + jobNs := j.Job.Namespace + jobName := j.Job.Spec.Trainer.ReplicaSpec.Name + + jobSpec, getErr := j.kubeCli.BatchV1().Jobs(jobNs).Get(jobName, v1.GetOptions{}) + if getErr != nil { + if apierrors.IsNotFound(getErr) { + return nil + } + log.Error("error getting job spec for TrainingJob trainer", "name", j.trainerName()) + return getErr + } + + if *jobSpec.Spec.Parallelism != 0 { + log.Debug("reset parallelism to zero for TrainingJob trainer", "name", j.trainerName()) + var parallism int32 + parallism = 0 + jobSpec.Spec.Parallelism = ¶llism + if _, err := j.kubeCli.BatchV1().Jobs(jobNs).Update(jobSpec); err != nil { + log.Error("error resetting parallelism for TrainingJob trainer", "name", j.trainerName()) + return err + } + } + + labels := Labels(map[string]string{ + "paddle-job": j.Job.Name, + }) + selector, _ := labels.LabelsParser() + options := v1.ListOptions{ + LabelSelector: selector, + } + + if err := j.kubeCli.CoreV1().Pods(jobNs).DeleteCollection(&v1.DeleteOptions{}, options); err != nil { + log.Error("error deleting pods of TrainingJob trainer", "name", j.trainerName()) + return err + } + + return nil +} + +func (j *JobUpdater) jobTotalRunning() (bool, error) { + if j.Job.Spec.FaultTolerant { + masterRunning, err := j.masterRoleTotalRunning(paddlev1.MASTER) + if err != nil { + return false, err + } + if !masterRunning { + return false, nil + } + } + + pserverRunning, err := j.masterRoleTotalRunning(paddlev1.PSERVER) + if err != nil { + return false, err + } + if !pserverRunning { + return false, nil + } + + return j.trainerTotalRunning() +} + +func (j *JobUpdater) masterRoleTotalRunning(rt paddlev1.TrainingResourceType) (bool, error) { + var resourceName string + switch rt { + case paddlev1.MASTER: + resourceName = j.Job.Spec.Master.ReplicaSpec.Name + case paddlev1.PSERVER: + resourceName = j.Job.Spec.Pserver.ReplicaSpec.Name + default: + return false, ErrorUnkownResourceType + } + resource, err := j.kubeCli.ExtensionsV1beta1().ReplicaSets(j.Job.Namespace).Get(resourceName, v1.GetOptions{}) + if err != nil { + return false, err + } + + log.Debug("resource status", "namespace", j.Job.Namespace, "name", resourceName, "status", resource.Status) + if resource.Status.ReadyReplicas >= *resource.Spec.Replicas { + return true, nil + } + return false, nil +} + +func (j *JobUpdater) trainerTotalRunning() (bool, error) { + trainerName := j.Job.Spec.Trainer.ReplicaSpec.Name + trainers, err := j.kubeCli.BatchV1().Jobs(j.Job.Namespace).Get(trainerName, v1.GetOptions{}) + if err != nil { + return false, err + } + + log.Debug("trainer status", "namespace", j.Job.Namespace, "name", trainerName, "status", trainers.Status) + podsList, err := j.kubeCli.CoreV1().Pods(j.Job.Namespace).List(v1.ListOptions{LabelSelector: "paddle-job=" + j.Job.Name}) + var runningPodCount int32 + for _, pod := range podsList.Items { + if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded { + runningPodCount++ + } + } + + if runningPodCount == *trainers.Spec.Parallelism { + return true, nil + } + return false, nil +} + +func (j *JobUpdater) findFailedTrainerPods() ([]*corev1.Pod, error) { + failedPods := make([]*corev1.Pod, 0) + + podsList, err := j.kubeCli.CoreV1().Pods(j.Job.Namespace).List(v1.ListOptions{LabelSelector: "paddle-job=" + j.Job.Name}) + if err != nil { + return failedPods, err + } + for _, pod := range podsList.Items { + if pod.Status.Phase == corev1.PodFailed { + failedPods = append(failedPods, &pod) + } + } + + return failedPods, nil +} + +func (j *JobUpdater) scale() (err error) { + jobNs := j.Job.Namespace + jobName := j.Job.Spec.Trainer.ReplicaSpec.Name + jobSpec, err := j.kubeCli.BatchV1().Jobs(jobNs).Get(jobName, v1.GetOptions{}) + if err != nil { + return err + } + + newParallelism := *jobSpec.Spec.Parallelism + j.Additional + newBackoffLimit := *jobSpec.Spec.BackoffLimit + if j.Additional < 0 { + newBackoffLimit -= j.Additional + } + jobSpec.Spec.Parallelism = &newParallelism + jobSpec.Spec.BackoffLimit = &newBackoffLimit + j.Job.Spec.Trainer.ReplicaSpec.Spec.Parallelism = &newParallelism + log.Debug("scaling job", "namespace", jobNs, "name", jobName, "new instance num", newParallelism) + if _, err := j.kubeCli.BatchV1().Jobs(jobNs).Update(jobSpec); err != nil { + log.Debug("failed to scale job", "namespace", jobNs, "name", jobName, "error", err.Error()) + return err + } + + return nil +} From 6617af06da748c551e589970288df9ff9a3900c5 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Mon, 16 Jul 2018 10:53:47 +0800 Subject: [PATCH 2/3] modify comments to pass golint check --- pkg/controller/trainingjob_controller.go | 29 +++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/pkg/controller/trainingjob_controller.go b/pkg/controller/trainingjob_controller.go index 7aba2e50..e6ed957d 100644 --- a/pkg/controller/trainingjob_controller.go +++ b/pkg/controller/trainingjob_controller.go @@ -31,12 +31,12 @@ import ( // TrainingJobController defines the structure to manage TrainingJob resource type TrainingJobController struct { - // KubeCli is a standard kubernetes clientset - KubeCli kubernetes.Interface - // ApiCli is the extension kubernetes clientset - ApiCli apiextensionsclient.Interface - // PaddleCli is a clientset for our own API group - PaddleCli paddleclientset.Interface + // kubeCli is a standard kubernetes clientset + kubeCli kubernetes.Interface + // apiCli is the extension kubernetes clientset + apiCli apiextensionsclient.Interface + // paddleCli is a clientset for our own API group + paddleCli paddleclientset.Interface trainingjobLister paddlelisters.TrainingJobLister trainingjobSynced cache.InformerSynced @@ -55,6 +55,7 @@ type TrainingJobController struct { autoclean bool } +// New returns a TrainingJobController object func New( kubeCli kubernetes.Interface, apiCli apiextensionsclient.Interface, @@ -72,9 +73,9 @@ func New( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "TrainingJobController"}) controller := &TrainingJobController{ - KubeCli: kubeCli, - ApiCli: apiCli, - PaddleCli: paddleCli, + kubeCli: kubeCli, + apiCli: apiCli, + paddleCli: paddleCli, trainingjobLister: traingingjobInformer.Lister(), trainingjobSynced: traingingjobInformer.Informer().HasSynced, jobtracker: new(sync.Map), @@ -126,7 +127,6 @@ func New( // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *TrainingJobController) Run(threadiness int, maxLoadDesired float64, stopCh <-chan struct{}) error { - // TODO add a lock to ensure there is only one controller in the cluster defer runtime.HandleCrash() defer c.workqueue.ShutDown() @@ -147,12 +147,9 @@ func (c *TrainingJobController) Run(threadiness int, maxLoadDesired float64, sto go wait.Until(c.runWorker, time.Second, stopCh) } - // gc := NewGarbageCollector(c.KubeCli, c.trainingjobLister) - // go gc.CleanOrphans(10 * time.Minute) - log.Info("Started workers") - as := autoscaler.NewAutoscaler(c.KubeCli, c.jobtracker, autoscaler.WithMaxLoadDesired(maxLoadDesired)) + as := autoscaler.NewAutoscaler(c.kubeCli, c.jobtracker, autoscaler.WithMaxLoadDesired(maxLoadDesired)) as.Run() <-stopCh @@ -178,7 +175,7 @@ func (c *TrainingJobController) createCRD() error { }, } - _, err := c.ApiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + _, err := c.apiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) if err != nil && !apierrors.IsAlreadyExists(err) { log.Error("Failed to create crd", "err", err.Error()) return err @@ -260,7 +257,7 @@ func (c *TrainingJobController) syncHandler(key string) (bool, error) { return true, fmt.Errorf("JobNotExists") } log.Debug("TrainingJob new", "namespace", job.Namespace, "name", job.Name) - nj := updater.NewJobUpdater(job, c.KubeCli, c.PaddleCli, c.autoclean) + nj := updater.NewJobUpdater(job, c.kubeCli, c.paddleCli, c.autoclean) c.jobtracker.Store(key, nj) jobUpdater = nj } else { From 189444b4633de65af5d7cee4fc71b3d5060c3c71 Mon Sep 17 00:00:00 2001 From: m3ngyang Date: Mon, 16 Jul 2018 11:05:20 +0800 Subject: [PATCH 3/3] fix travis problem --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ed8fcca8..52d999bf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,7 @@ matrix: - curl https://glide.sh/get | bash - sudo pip install pre-commit script: - - rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/.copyright.hook + - rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/tools/codestyle/copyright.hook - bash -x .tools/check_style.sh - ln -s $GOPATH/src/github.com/PaddlePaddle $GOPATH/src/github.com/paddlepaddle - cd $GOPATH/src/github.com/paddlepaddle/edl