diff --git a/pkg/localcontroller/db/db.go b/pkg/localcontroller/db/db.go index 38c133b4d..106ff471b 100644 --- a/pkg/localcontroller/db/db.go +++ b/pkg/localcontroller/db/db.go @@ -18,6 +18,7 @@ package db import ( "encoding/json" + "fmt" "os" "path/filepath" @@ -77,6 +78,18 @@ func SaveResource(name string, typeMeta, objectMeta, spec interface{}) error { return nil } +// GetResource gets resource info in db +func GetResource(name string) (*Resource, error) { + r := Resource{} + + queryResult := dbClient.Where("name = ?", name).First(&r) + if queryResult.RowsAffected == 0 { + return nil, fmt.Errorf("resource(name=%s) not in db", name) + } + + return &r, nil +} + // DeleteResource deletes resource info in db func DeleteResource(name string) error { var err error diff --git a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go index 75ffa5967..c1619d2bc 100644 --- a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go +++ b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go @@ -41,15 +41,13 @@ import ( "github.com/kubeedge/sedna/pkg/localcontroller/trigger" "github.com/kubeedge/sedna/pkg/localcontroller/util" workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // IncrementalLearningJob defines config for incremental-learning-job type Job struct { sednav1.IncrementalLearningJob JobConfig *JobConfig - Dataset *dataset.Dataset - Storage storage.Storage - Done chan struct{} } // JobConfig defines config for incremental-learning-job @@ -71,6 +69,9 @@ type JobConfig struct { EvalModels []Model EvalResult []Model Lock sync.Mutex + Dataset *dataset.Dataset + Storage storage.Storage + Done chan struct{} } type Model = clienttypes.Model @@ -84,7 +85,7 @@ type OutputConfig struct { // DataSamples defines samples information type DataSamples struct { - Numbers int + PreviousNumbers int TrainSamples []string EvalVersionSamples [][]string EvalSamples []string @@ -114,6 +115,10 @@ const ( TriggerReadyStatus = "ready" // TriggerCompletedStatus is the completed status about trigger TriggerCompletedStatus = "completed" + + AnnotationsRoundsKey = "sedna.io/rounds" + AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" + AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" ) // New creates a incremental-learning-job manager @@ -139,7 +144,7 @@ func (im *Manager) Start() error { } // trainTask starts training task -func (im *Manager) trainTask(job *Job, currentRound int) error { +func (im *Manager) trainTask(job *Job) error { jobConfig := job.JobConfig latestCond := im.getLatestCondition(job) @@ -147,45 +152,58 @@ func (im *Manager) trainTask(job *Job, currentRound int) error { currentType := latestCond.Type if currentType == sednav1.ILJobStageCondWaiting { - if job.Dataset == nil { - return fmt.Errorf("job(name=%s) dataset not ready", jobConfig.UniqueIdentifier) - } - - err := im.loadTrainModel(job) - if err != nil { - return fmt.Errorf("failed to sync train model, and waiting it: %v", err) - } + var err error - if currentRound < jobConfig.Rounds { - currentRound = jobConfig.Rounds - initTriggerStatus(jobConfig) + err = im.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - } - if currentType == sednav1.ILJobStageCondWaiting && jobConfig.TrainTriggerStatus == TriggerReadyStatus { - payload, ok, err := im.triggerTrainTask(job) - if !ok { - return nil + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier) } + err = im.loadTrainModel(job) if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobStage, err) - return err + return fmt.Errorf("failed to sync train model, and waiting it: %w", err) } - err = im.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - klog.Errorf("job(name=%s) failed to write message: %v", - jobConfig.UniqueIdentifier, err) - return err - } + initTriggerStatus(jobConfig) + + if jobConfig.TrainTriggerStatus == TriggerReadyStatus { + payload, ok, err := im.triggerTrainTask(job) + if !ok { + return nil + } - jobConfig.TrainTriggerStatus = TriggerCompletedStatus - jobConfig.Rounds++ - forwardSamples(jobConfig, jobStage) - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobStage) + if err != nil { + klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v", + jobConfig.UniqueIdentifier, jobStage, err) + job.JobConfig.Rounds-- + return err + } + + err = im.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + job.JobConfig.Rounds-- + return err + } + + forwardSamples(jobConfig, jobStage) + + err = im.saveJobToDB(job) + if err != nil { + klog.Errorf("job(%s) failed to save job to db: %v", + jobConfig.UniqueIdentifier, err) + // continue anyway + } + + jobConfig.TrainTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -200,29 +218,39 @@ func (im *Manager) evalTask(job *Job) error { currentType := latestCond.Type if currentType == sednav1.ILJobStageCondWaiting { - err := im.loadDeployModel(job) - if err != nil { - return fmt.Errorf("failed to sync deploy model, and waiting it: %v", err) - } - } + var err error - if currentType == sednav1.ILJobStageCondWaiting && jobConfig.EvalTriggerStatus == TriggerReadyStatus { - payload, err := im.triggerEvalTask(job) - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobStage, err) - return err + err = im.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - err = im.Client.WriteMessage(payload, job.getHeader()) + err = im.loadDeployModel(job) if err != nil { - return err + return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err) } - jobConfig.EvalTriggerStatus = TriggerCompletedStatus - forwardSamples(jobConfig, jobStage) - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobStage) + if jobConfig.TrainTriggerStatus == TriggerReadyStatus { + payload, err := im.triggerEvalTask(job) + if err != nil { + klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v", + jobConfig.UniqueIdentifier, jobStage, err) + return err + } + + err = im.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + return err + } + + forwardSamples(jobConfig, jobStage) + + jobConfig.TrainTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -236,41 +264,40 @@ func (im *Manager) deployTask(job *Job) { neededDeploy, err = im.triggerDeployTask(job) status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} + models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy) - if err == nil && neededDeploy { - deployModel, err := im.deployModel(job) + if err == nil && neededDeploy && models != nil { + trainedModel := models[0] + deployModel := models[1] + err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL) if err != nil { - klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err) - } else { - klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier) - } - if err != nil || deployModel == nil { status.Status = string(sednav1.ILJobStageCondFailed) + klog.Errorf("job(%s) failed to update model: %v", jobConfig.UniqueIdentifier, err) } else { status.Status = string(sednav1.ILJobStageCondReady) - status.Input = &clienttypes.Input{ - Models: []Model{ - *deployModel, - }, - } + klog.Infof("job(%s) updated model successfully", jobConfig.UniqueIdentifier) + } + + status.Input = &clienttypes.Input{ + Models: models, } - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", + klog.Infof("job(%s) completed the %sing phase triggering task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) } else { // No need to deploy, just report completed status // TODO: instead of reporting deploy-completed, another more reasonable status - klog.Infof("no need to deploy model for job(name=%s)", jobConfig.UniqueIdentifier) + klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) status.Status = string(sednav1.ILJobStageCondCompleted) } err = im.Client.WriteMessage(status, job.getHeader()) if err != nil { - klog.Errorf("job(name=%s) complete the %s task failed, error: %v", + klog.Errorf("job(%s) completed the %s task failed: %v", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err) } - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) + klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) } // startJob starts a job @@ -278,42 +305,32 @@ func (im *Manager) startJob(name string) { var err error job := im.IncrementalJobMap[name] - job.JobConfig = new(JobConfig) - jobConfig := job.JobConfig - jobConfig.UniqueIdentifier = name - - err = im.initJob(job) + err = im.initJob(job, name) if err != nil { - klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier) + klog.Errorf("failed to init job (name=%s): %+v", name) return } - klog.Infof("incremental job(name=%s) is started", name) - defer klog.Infof("incremental learning job(name=%s) is stopped", name) + klog.Infof("incremental job(%s) was started", name) + defer klog.Infof("incremental learning job(%s) was stopped", name) - cond := im.getLatestCondition(job) - currentType := cond.Type - jobStage := cond.Stage - if jobStage == sednav1.ILJobTrain && currentType == sednav1.ILJobStageCondWaiting { - go im.handleData(job) - } - - currentRound := jobConfig.Rounds + // handle data from dataset + go im.handleData(job) tick := time.NewTicker(JobIterationIntervalSeconds * time.Second) for { select { - case <-job.Done: + case <-job.JobConfig.Done: return default: } - latestCond := im.getLatestCondition(job) - jobStage := latestCond.Stage + cond := im.getLatestCondition(job) + jobStage := cond.Stage switch jobStage { case sednav1.ILJobTrain: - err = im.trainTask(job, currentRound) + err = im.trainTask(job) case sednav1.ILJobEval: err = im.evalTask(job) default: @@ -322,8 +339,7 @@ func (im *Manager) startJob(name string) { } if err != nil { - klog.Errorf("job(name=%s) complete the %s task failed, error: %v", - jobConfig.UniqueIdentifier, jobStage, err) + klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err) } <-tick.C @@ -338,8 +354,6 @@ func (im *Manager) Insert(message *clienttypes.Message) error { job, ok := im.IncrementalJobMap[name] if !ok { job = &Job{} - job.Storage = storage.Storage{IsLocalStorage: false} - job.Done = make(chan struct{}) im.IncrementalJobMap[name] = job first = true } @@ -348,13 +362,6 @@ func (im *Manager) Insert(message *clienttypes.Message) error { return err } - credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] - if credential != "" { - if err := job.Storage.SetCredential(credential); err != nil { - return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) - } - } - if first { go im.startJob(name) } @@ -370,8 +377,8 @@ func (im *Manager) Insert(message *clienttypes.Message) error { func (im *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) - if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil { - close(job.Done) + if job, ok := im.IncrementalJobMap[name]; ok && job.JobConfig.Done != nil { + close(job.JobConfig.Done) } delete(im.IncrementalJobMap, name) @@ -383,13 +390,104 @@ func (im *Manager) Delete(message *clienttypes.Message) error { return nil } +// updateJobFromDB updates job from db +func (im *Manager) updateJobFromDB(job *Job) error { + var err error + + previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier) + if err != nil { + return err + } + + m := metav1.ObjectMeta{} + if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) { + return err + } + + rounds, ok := m.Annotations[AnnotationsRoundsKey] + if !ok { + return nil + } + + if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil { + return err + } + + numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey] + if !ok { + return nil + } + + if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil { + return err + } + + dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey] + if !ok { + return nil + } + + localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "") + + if !job.JobConfig.Storage.IsLocalStorage { + defer os.RemoveAll(localURL) + } + + if err != nil { + return err + } + + samples, err := dataset.GetSamples(dataFileOfEval) + if err != nil { + klog.Errorf("read file %s failed: %v", dataFileOfEval, err) + return err + } + + job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples) + + return nil +} + +// saveJobToDB saves job info to db +func (im *Manager) saveJobToDB(job *Job) error { + ann := job.ObjectMeta.Annotations + if ann == nil { + ann = make(map[string]string) + } + + ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds) + ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers) + ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL + + return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec) +} + // initJob inits the job object -func (im *Manager) initJob(job *Job) error { +func (im *Manager) initJob(job *Job, name string) error { + job.JobConfig = new(JobConfig) + jobConfig := job.JobConfig + jobConfig.UniqueIdentifier = name + + jobConfig.Storage = storage.Storage{IsLocalStorage: false} + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] + if credential != "" { + if err := job.JobConfig.Storage.SetCredential(credential); err != nil { + return fmt.Errorf("failed to set storage credential: %w", err) + } + } + + jobConfig.Done = make(chan struct{}) jobConfig.Lock = sync.Mutex{} + jobConfig.Rounds = 0 + + jobConfig.DataSamples = &DataSamples{ + PreviousNumbers: 0, + TrainSamples: make([]string, 0), + EvalVersionSamples: make([][]string, 0), + EvalSamples: make([]string, 0), + } - jobConfig.Rounds = 1 - initTriggerStatus(jobConfig) trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger) if err != nil { return fmt.Errorf("failed to init train trigger: %+w", err) @@ -403,13 +501,13 @@ func (im *Manager) initJob(job *Job) error { outputDir := job.Spec.OutputDir - isLocalURL, err := job.Storage.IsLocalURL(outputDir) + isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir) if err != nil { - return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir) + return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err) } if isLocalURL { - job.Storage.IsLocalStorage = true + jobConfig.Storage.IsLocalStorage = true outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir) } @@ -419,6 +517,12 @@ func (im *Manager) initJob(job *Job) error { return err } + if err := im.updateJobFromDB(job); err != nil { + klog.Errorf("job(%s) failed to update job from db: %v", name, err) + } + + initTriggerStatus(jobConfig) + return nil } @@ -442,42 +546,63 @@ func newTrigger(t sednav1.Trigger) (trigger.Base, error) { return trigger.NewTrigger(triggerMap) } -// getTrainOrEvalModel gets train model or eval model from job conditions -func (im *Manager) getTrainOrEvalModel(job *Job, jobStage sednav1.ILJobStage) *Model { +// getModelFromJobConditions gets model from job conditions for train/eval/deploy +func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) []Model { jobConditions := job.Status.Conditions - // TODO: runtime.type changes to common.type for gm and lc - var models []runtime.Model - - for i := len(jobConditions) - 1; i >= 0; i-- { - var cond gmtypes.IncrementalCondData - jobCond := jobConditions[i] - if jobCond.Stage == sednav1.ILJobTrain && jobCond.Type == sednav1.ILJobStageCondCompleted { - if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { - continue - } - - if cond.Output == nil { - continue + getModels := func(stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []runtime.Model { + // TODO: runtime.type changes to common.type for gm and lc + for i := len(jobConditions) - 1; i >= 0; i-- { + var cond gmtypes.IncrementalCondData + jobCond := jobConditions[i] + if jobCond.Stage == stage && jobCond.Type == currentType { + if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + continue + } + + if dataType == "input" { + if cond.Input == nil { + continue + } + + return cond.Input.Models + } else if dataType == "output" { + if cond.Output == nil { + continue + } + + return cond.Output.Models + } } - // models list has two model, first is deploy model, second is trained model - models = cond.Output.Models - - break } - } - // models must have two model file info which are output of train, - // first model will be used for inference if it evaluated as excellent, second model will be used for retaining. - if len(models) != 2 { return nil } switch jobStage { case sednav1.ILJobTrain: - return &Model{Format: models[1].Format, URL: models[1].URL} + // the second model is the pre-trained model of train stage. + models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") + if models != nil { + return []Model{{Format: models[1].Format, URL: models[1].URL}} + } case sednav1.ILJobEval: - return &Model{Format: models[0].Format, URL: models[0].URL} + // the first model is the output model of train stage. + models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") + if models != nil { + return []Model{{Format: models[0].Format, URL: models[0].URL}} + } + case sednav1.ILJobDeploy: + // two models for deploy stage: + // the first model is the output model of train stage, which was evaluated as better than the second model in eval stage. + // the second model is the serving model used in the inference worker. + var deployModels []Model + models := getModels(sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input") + for _, m := range models { + deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL}) + } + + return deployModels } return nil @@ -499,6 +624,8 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { return nil, false, nil } + job.JobConfig.Rounds++ + var m *Model latestCondition := im.getLatestCondition(job) @@ -506,24 +633,25 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { if rounds <= 1 { m = jobConfig.TrainModel } else { - m = im.getTrainOrEvalModel(job, latestCondition.Stage) - if m == nil { - return nil, false, err + models := im.getModelFromJobConditions(job, latestCondition.Stage) + if models != nil { + m = &models[0] } } var dataIndexURL string jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples, - jobConfig.OutputConfig.SamplesOutput["train"], rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix) + jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("job(name=%s) train phase: write samples to the file(%s) is failed, error: %v", + job.JobConfig.Rounds-- + klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v", jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err) return nil, false, err } dataURL := jobConfig.TrainDataURL outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/") - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir) @@ -540,6 +668,7 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { Status: string(sednav1.ILJobStageCondReady), Input: &input, } + jobConfig.TriggerTime = time.Now() return &msg, true, nil } @@ -551,12 +680,12 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro latestCondition := im.getLatestCondition(job) - m := im.getTrainOrEvalModel(job, latestCondition.Stage) - if m == nil { + ms := im.getModelFromJobConditions(job, latestCondition.Stage) + if ms == nil { return nil, err } - models := []Model{*m, { + models := []Model{ms[0], { Format: jobConfig.DeployModel.Format, URL: jobConfig.DeployModel.URL, }} @@ -564,17 +693,16 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig.EvalModels = models var dataIndexURL string - rounds := jobConfig.Rounds jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], - rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix) + job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v", + klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v", jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) return nil, err } dataURL := jobConfig.EvalDataURL - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL) } @@ -652,32 +780,19 @@ func (im *Manager) triggerDeployTask(job *Job) (bool, error) { return jobConfig.DeployTrigger.Trigger(metricDelta), nil } -// deployModel deploys model -func (im *Manager) deployModel(job *Job) (*Model, error) { - jobConfig := job.JobConfig - - trainedModel := jobConfig.EvalModels[0].URL - deployModel := jobConfig.EvalModels[1].URL - if job.Storage.IsLocalStorage { +// updateDeployModelFile updates deploy model file +func (im *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error { + if job.JobConfig.Storage.IsLocalStorage { trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel) } - if err := job.updateDeployModel(deployModel, trainedModel); err != nil { - return nil, err + if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil { + return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w", + trainedModel, deployModel, err) } - klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, trainedModel) - - return &jobConfig.EvalModels[0], nil -} - -func (job *Job) updateDeployModel(deployModel string, newModel string) error { - if err := job.Storage.CopyFile(newModel, deployModel); err != nil { - return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v", - newModel, deployModel, err) - } + klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel) - klog.Infof("copy model(url=%s) to the deploy model(url=%s) successfully", newModel, deployModel) return nil } @@ -687,16 +802,16 @@ func (job *Job) createOutputDir(jobConfig *JobConfig) error { dirNames := []string{"data/train", "data/eval", "train", "eval"} - if job.Storage.IsLocalStorage { + if job.JobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(outputDir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err) return err } for _, v := range dirNames { dir := path.Join(outputDir, v) if err := util.CreateFolder(dir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err) return err } } @@ -772,7 +887,7 @@ func (im *Manager) loadDeployModel(job *Job) error { // loadDataset loads dataset information func (im *Manager) loadDataset(job *Job) error { - if job.Dataset != nil { + if job.JobConfig.Dataset != nil { // already loaded return nil } @@ -783,15 +898,7 @@ func (im *Manager) loadDataset(job *Job) error { return fmt.Errorf("not exists dataset(name=%s)", datasetName) } - jobConfig := job.JobConfig - jobConfig.DataSamples = &DataSamples{ - Numbers: 0, - TrainSamples: make([]string, 0), - EvalVersionSamples: make([][]string, 0), - EvalSamples: make([]string, 0), - } - - job.Dataset = dataset + job.JobConfig.Dataset = dataset return nil } @@ -803,49 +910,47 @@ func (im *Manager) handleData(job *Job) { iterCount := 0 for { select { - case <-job.Done: + case <-jobConfig.Done: return default: } - // in case dataset is not synced to LC before job synced to LC - // here call loadDataset in each period - err := im.loadDataset(job) if iterCount%100 == 0 { - klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier) + klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier) } iterCount++ - if err != nil { - klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v", - jobConfig.UniqueIdentifier, - err) + + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + // already loaded dataset <-tick.C continue } - dataset := job.Dataset + dataset := jobConfig.Dataset + currentNumberOfSamples := dataset.DataSource.NumberOfSamples + previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers - if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers { + if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples { samples := dataset.DataSource.TrainSamples - trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers)) + newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples + trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples)) jobConfig.Lock.Lock() jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples, - samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...) - klog.Infof("job(name=%s) current train samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples)) + samples[(previousNumberOfSamples+1):(previousNumberOfSamples+trainNum+1)]...) + klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum) jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples, - samples[(jobConfig.DataSamples.Numbers+trainNum+1):]) + samples[(previousNumberOfSamples+trainNum+1):]) jobConfig.Lock.Unlock() for _, v := range jobConfig.DataSamples.EvalVersionSamples { jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...) } - klog.Infof("job(name=%s) current eval samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) + evalNum := newNumberOfSamples - trainNum + klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, evalNum) - jobConfig.DataSamples.Numbers = len(samples) + jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples } <-tick.C @@ -866,10 +971,15 @@ func createFile(dir string, format string, isLocalStorage bool) (string, string) // writeSamples writes samples information to a file func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { + if samples == nil { + return "", "", fmt.Errorf("not samples") + } + + jobConfig := job.JobConfig subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/") - fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage) + fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage) - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(subDir); err != nil { return "", "", err } @@ -877,7 +987,7 @@ func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds i return "", "", err } - if !job.Dataset.Storage.IsLocalStorage { + if !jobConfig.Dataset.Storage.IsLocalStorage { tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) if err := im.writeByLine(tempSamples, absURLFile); err != nil { return "", "", err @@ -892,13 +1002,13 @@ func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds i return "", "", err } - localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage) + localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage) if err := im.writeByLine(samples, localFileURL); err != nil { return "", "", err } - if err := job.Storage.Upload(localFileURL, fileURL); err != nil { + if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil { return "", "", err } @@ -908,7 +1018,7 @@ func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds i return "", "", err } - if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil { + if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil { return "", "", err } @@ -931,12 +1041,12 @@ func (im *Manager) writeByLine(samples []string, fileURL string) error { _, _ = fmt.Fprintln(w, line) } if err := w.Flush(); err != nil { - klog.Errorf("write file(%s) failed", fileURL) + klog.Errorf("failed to write file(%s): %v", fileURL, err) return err } if err := file.Close(); err != nil { - klog.Errorf("close file failed, error: %v", err) + klog.Errorf("failed to close file(%s): %v", fileURL, err) return err } @@ -972,7 +1082,7 @@ func (im *Manager) monitorWorker() { } if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil { - klog.Errorf("job(name=%s) failed to write message: %v", name, err) + klog.Errorf("job(%s) failed to write message: %v", name, err) continue } @@ -987,7 +1097,7 @@ func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.Messa workerKind := strings.ToLower(workerMessage.Kind) if jobStage != workerKind { - klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, + klog.Warningf("job(%s)'s %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, jobStage, workerKind) return } @@ -1021,11 +1131,11 @@ func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.Messa jobName := job.JobConfig.UniqueIdentifier if workerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage) + klog.Infof("job(%s) completed the %s task successfully", jobName, jobStage) switch latestCond.Stage { case sednav1.ILJobEval: job.JobConfig.EvalResult = models - // when eval worker is complete, the deploy task starts immediately without waiting for the notification of GM. + // when eval worker is completed status, the deploy task will starts immediately without waiting for the notification of GM. im.deployTask(job) } }