From f58213fcd50c3c54fc4c888625f3de95d0a66ffd Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Tue, 24 May 2022 11:06:51 +0800 Subject: [PATCH] IL supports multipile device soc versions Signed-off-by: JimmyYang20 --- .../sedna.io_incrementallearningjobs.yaml | 7 + build/crds/sedna.io_models.yaml | 4 + .../v1alpha1/incrementallearningjob_types.go | 7 +- pkg/apis/sedna/v1alpha1/model_types.go | 5 +- .../sedna/v1alpha1/zz_generated.deepcopy.go | 28 ++- .../incrementallearning/downstream.go | 81 +++++-- pkg/globalmanager/runtime/types.go | 5 +- .../incrementallearningjob.go | 205 ++++++++++++------ 8 files changed, 245 insertions(+), 97 deletions(-) diff --git a/build/crds/sedna.io_incrementallearningjobs.yaml b/build/crds/sedna.io_incrementallearningjobs.yaml index 37b7507f7..53ed11586 100644 --- a/build/crds/sedna.io_incrementallearningjobs.yaml +++ b/build/crds/sedna.io_incrementallearningjobs.yaml @@ -7145,6 +7145,13 @@ spec: evalSpec: description: EvalSpec describes the data an eval worker should have properties: + initialEvalModel: + properties: + name: + type: string + required: + - name + type: object template: description: PodTemplateSpec describes the data a pod should have when created from a template diff --git a/build/crds/sedna.io_models.yaml b/build/crds/sedna.io_models.yaml index 036eb2d48..03a5fd9d3 100644 --- a/build/crds/sedna.io_models.yaml +++ b/build/crds/sedna.io_models.yaml @@ -38,6 +38,10 @@ spec: properties: credentialName: type: string + device_soc_versions: + items: + type: string + type: array format: type: string url: diff --git a/pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go b/pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go index 8838a4440..9c014edb6 100644 --- a/pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go +++ b/pkg/apis/sedna/v1alpha1/incrementallearningjob_types.go @@ -57,7 +57,8 @@ type TrainSpec struct { // EvalSpec describes the data an eval worker should have type EvalSpec struct { - Template v1.PodTemplateSpec `json:"template"` + InitialModel *InitialEvalModel `json:"initialEvalModel,omitempty"` + Template v1.PodTemplateSpec `json:"template"` } // DeploySpec describes the deploy model to be updated @@ -94,6 +95,10 @@ type InitialModel struct { Name string `json:"name"` } +type InitialEvalModel struct { + Name string `json:"name"` +} + type DeployModel struct { Name string `json:"name"` // HotUpdateEnabled will enable the model hot update feature if its value is true. diff --git a/pkg/apis/sedna/v1alpha1/model_types.go b/pkg/apis/sedna/v1alpha1/model_types.go index 42f93f916..01e8b9b51 100644 --- a/pkg/apis/sedna/v1alpha1/model_types.go +++ b/pkg/apis/sedna/v1alpha1/model_types.go @@ -36,8 +36,9 @@ type Model struct { // ModelSpec is a description of a model type ModelSpec struct { - URL string `json:"url"` - Format string `json:"format"` + URL string `json:"url"` + Format string `json:"format"` + Devices []string `json:"device_soc_versions,omitempty"` CredentialName string `json:"credentialName,omitempty"` } diff --git a/pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go index 12a355796..cf7d9b8a1 100644 --- a/pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sedna/v1alpha1/zz_generated.deepcopy.go @@ -247,6 +247,11 @@ func (in *EdgeWorker) DeepCopy() *EdgeWorker { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EvalSpec) DeepCopyInto(out *EvalSpec) { *out = *in + if in.InitialModel != nil { + in, out := &in.InitialModel, &out.InitialModel + *out = new(InitialEvalModel) + **out = **in + } in.Template.DeepCopyInto(&out.Template) return } @@ -565,6 +570,22 @@ func (in *IncrementalLearningJobList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InitialEvalModel) DeepCopyInto(out *InitialEvalModel) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InitialEvalModel. +func (in *InitialEvalModel) DeepCopy() *InitialEvalModel { + if in == nil { + return nil + } + out := new(InitialEvalModel) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InitialModel) DeepCopyInto(out *InitialModel) { *out = *in @@ -983,7 +1004,7 @@ func (in *Model) DeepCopyInto(out *Model) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) return } @@ -1042,6 +1063,11 @@ func (in *ModelList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModelSpec) DeepCopyInto(out *ModelSpec) { *out = *in + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]string, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/globalmanager/controllers/incrementallearning/downstream.go b/pkg/globalmanager/controllers/incrementallearning/downstream.go index 84be05287..beee98383 100644 --- a/pkg/globalmanager/controllers/incrementallearning/downstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/downstream.go @@ -106,10 +106,10 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro currentType := latestCondition.Type jobStage := latestCondition.Stage - syncModelWithName := func(modelName string) { - if err := c.syncModelWithName(dsNodeName, modelName, job.Namespace); err != nil { + syncModelWithName := func(modelName string, nodeName string) { + if err := c.syncModelWithName(nodeName, modelName, job.Namespace); err != nil { klog.Warningf("Error to sync model %s when sync incremental learning job %s to node %s: %v", - modelName, job.Name, dsNodeName, err) + modelName, job.Name, nodeName, err) } } @@ -131,29 +131,68 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return false } - doJobStageEvent := func(modelName string, nodeName string) { - if currentType == sednav1.ILJobStageCondWaiting { + // delete job + deleteJob := func(nodeName string) { + if !isJobResidentNode(nodeName) { + // delete LC's job from nodeName that's different from dataset node when worker's status + // is completed or failed. + c.sendToEdgeFunc(nodeName, watch.Deleted, job) + } + } + + switch currentType { + case sednav1.ILJobStageCondWaiting: + switch jobStage { + case sednav1.ILJobTrain: + syncModelWithName(job.Spec.InitialModel.Name, dsNodeName) syncJobWithNodeName(dsNodeName) - if modelName != "" { - syncModelWithName(modelName) + case sednav1.ILJobEval: + syncModelWithName(job.Spec.DeploySpec.Model.Name, dsNodeName) + if job.Spec.EvalSpec.InitialModel != nil { + syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, dsNodeName) } - } else if currentType == sednav1.ILJobStageCondRunning { - syncJobWithNodeName(nodeName) - } else if currentType == sednav1.ILJobStageCondCompleted || currentType == sednav1.ILJobStageCondFailed { - if !isJobResidentNode(nodeName) { - // delete LC's job from nodeName that's different from dataset node when worker's status is completed or failed. - c.sendToEdgeFunc(nodeName, watch.Deleted, job) + syncJobWithNodeName(dsNodeName) + case sednav1.ILJobDeploy: + deployNodeName = evalNodeName + + syncModelWithName(job.Spec.DeploySpec.Model.Name, evalNodeName) + if job.Spec.EvalSpec.InitialModel != nil && !job.Spec.DeploySpec.Model.HotUpdateEnabled { + syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, deployNodeName) } + syncJobWithNodeName(deployNodeName) } - } + case sednav1.ILJobStageCondRunning: + switch jobStage { + case sednav1.ILJobTrain: + syncJobWithNodeName(trainNodeName) + case sednav1.ILJobEval: + if trainNodeName != evalNodeName && trainNodeName != dsNodeName { + c.sendToEdgeFunc(trainNodeName, watch.Deleted, job) + } + syncJobWithNodeName(evalNodeName) + case sednav1.ILJobDeploy: + if evalNodeName != deployNodeName && evalNodeName != dsNodeName { + c.sendToEdgeFunc(evalNodeName, watch.Deleted, job) + } - switch jobStage { - case sednav1.ILJobTrain: - doJobStageEvent(job.Spec.InitialModel.Name, trainNodeName) - case sednav1.ILJobEval: - doJobStageEvent(job.Spec.DeploySpec.Model.Name, evalNodeName) - case sednav1.ILJobDeploy: - doJobStageEvent("", deployNodeName) + if job.Spec.EvalSpec.InitialModel != nil { + syncModelWithName(job.Spec.EvalSpec.InitialModel.Name, deployNodeName) + } + syncModelWithName(job.Spec.DeploySpec.Model.Name, deployNodeName) + syncJobWithNodeName(deployNodeName) + } + case sednav1.ILJobStageCondCompleted, sednav1.ILJobStageCondFailed: + if !job.Spec.DeploySpec.Model.HotUpdateEnabled { + deployNodeName = evalNodeName + } + switch jobStage { + case sednav1.ILJobTrain: + deleteJob(trainNodeName) + case sednav1.ILJobEval: + deleteJob(evalNodeName) + case sednav1.ILJobDeploy: + deleteJob(deployNodeName) + } } return nil diff --git a/pkg/globalmanager/runtime/types.go b/pkg/globalmanager/runtime/types.go index 237bf92ba..3b0d99baa 100644 --- a/pkg/globalmanager/runtime/types.go +++ b/pkg/globalmanager/runtime/types.go @@ -55,8 +55,9 @@ const ( ) type Model struct { - Format string `json:"format,omitempty"` - URL string `json:"url,omitempty"` + Format string `json:"format"` + URL string `json:"url"` + Devices []string `json:"device_soc_versions,omitempty"` Metrics map[string]interface{} `json:"metrics,omitempty"` } diff --git a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go index d9c62e8d4..a0cffdbdd 100644 --- a/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go +++ b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go @@ -70,7 +70,8 @@ type JobConfig struct { DataSamples *DataSamples TrainModel *Model DeployModel *Model - EvalModels []Model + EvalModel *Model + EvalResult []Model Lock sync.Mutex Dataset *dataset.Dataset Storage storage.Storage @@ -234,6 +235,17 @@ func (im *Manager) evalTask(job *Job) error { return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err) } + if job.Spec.EvalSpec.InitialModel != nil { + err = im.loadEvalModel(job) + if err != nil { + return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err) + } + } else { + if jobConfig.EvalModel == nil { + jobConfig.EvalModel = jobConfig.DeployModel + } + } + if jobConfig.EvalTriggerStatus == TriggerReadyStatus { payload, err := im.triggerEvalTask(job) if err != nil { @@ -261,20 +273,29 @@ func (im *Manager) evalTask(job *Job) error { // hotModelUpdateDeployTask starts deploy task when job supports hot model update func (im *Manager) hotModelUpdateDeployTask(job *Job) error { + var localModelConfigFile string + if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok { + localModelConfigFile = v + } else { + return nil + } + if job.JobConfig.HotModelUpdateDeployTriggerStatus == TriggerReadyStatus { - var localModelConfigFile string - if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok { - localModelConfigFile = v - } else { - return nil + var err error + err = im.loadDeployModel(job) + if err != nil { + return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err) } - models := im.getJobStageModel(job, sednav1.ILJobDeploy) - if models == nil { - return nil + if job.Spec.EvalSpec.InitialModel != nil { + err = im.loadEvalModel(job) + if err != nil { + return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err) + } } - trainedModel := models[0] - deployModel := models[1] + + trainedModel := im.getModelFromJobConditions(job, sednav1.ILJobDeploy) + deployModel := job.JobConfig.DeployModel trainedModelURL := trainedModel.URL modelName := filepath.Base(trainedModelURL) @@ -291,6 +312,14 @@ func (im *Manager) hotModelUpdateDeployTask(job *Job) error { return err } + evalModel := job.JobConfig.EvalModel + if evalModel != nil { + newEvalModel := im.getModelFromJobConditions(job, sednav1.ILJobEval) + if err := im.updateDeployModelFile(job, newEvalModel.URL, evalModel.URL); err != nil { + return err + } + } + config := map[string]map[string]string{ "model_config": { "model_path": strings.Replace(localHostModelFile, localHostDir, @@ -325,27 +354,47 @@ func (im *Manager) hotModelUpdateDeployTask(job *Job) error { // deployTask starts deploy task func (im *Manager) deployTask(job *Job) error { if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { + if err := im.loadDeployModel(job); err != nil { + return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err) + } + + if !job.Spec.DeploySpec.Model.HotUpdateEnabled && job.Spec.EvalSpec.InitialModel != nil { + err := im.loadEvalModel(job) + if err != nil { + return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err) + } + } + jobConfig := job.JobConfig var err error var neededDeploy bool neededDeploy, err = im.triggerDeployTask(job) status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} - models := im.getJobStageModel(job, sednav1.ILJobDeploy) - if err == nil && neededDeploy && models != nil { + if err == nil && neededDeploy { + var models []Model + trainedModel := im.getModelFromJobConditions(job, sednav1.ILJobDeploy) + deployModel := jobConfig.DeployModel + models = append(models, *trainedModel, *deployModel) + if !job.Spec.DeploySpec.Model.HotUpdateEnabled { - trainedModel := models[0] - deployModel := models[1] err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL) if err != nil { status.Status = string(sednav1.ILJobStageCondFailed) klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) - return err + } else { + status.Status = string(sednav1.ILJobStageCondReady) + klog.Infof("update model for job(%s) successfully", jobConfig.UniqueIdentifier) } - status.Status = string(sednav1.ILJobStageCondReady) - klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy) + evalModel := job.JobConfig.EvalModel + if evalModel != nil { + newEvalModel := im.getModelFromJobConditions(job, sednav1.ILJobEval) + if err := im.updateDeployModelFile(job, newEvalModel.URL, evalModel.URL); err != nil { + return err + } + } } else { status.Status = string(sednav1.ILJobStageCondReady) } @@ -367,11 +416,11 @@ func (im *Manager) deployTask(job *Job) error { if err != nil { klog.Errorf("job(%s) completed the %s task failed: %v", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err) - return err } job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus } + return nil } @@ -700,37 +749,48 @@ func (im *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) { return result, err } -// getJobStageModel gets model from job conditions for train/eval/deploy -func (im *Manager) getJobStageModel(job *Job, jobStage sednav1.ILJobStage) []Model { +// getModelFromJobConditions gets model from job conditions for train/eval/deploy +func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) *Model { jobConditions := job.Status.Conditions + jobConfig := job.JobConfig + getModel := func(initModel *Model, models []Model) *Model { + for _, m := range models { + if m.Format == initModel.Format { + if initModel.Devices != nil && len(m.Devices) == 1 { + for _, d := range initModel.Devices { + if m.Devices[0] == d { + return &m + } + } + } else { + return &m + } + } + } + + return nil + } + + models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") + if models == nil { + return nil + } + + var model *Model switch jobStage { case sednav1.ILJobTrain: - // the second model is the pre-trained model of train stage. - models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") - if models != nil { - return []Model{{Format: models[1].Format, URL: models[1].URL}} - } + model = jobConfig.TrainModel case sednav1.ILJobEval: - // the first model is the output model of train stage. - models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output") - if models != nil { - return []Model{{Format: models[0].Format, URL: models[0].URL}} - } + model = jobConfig.EvalModel case sednav1.ILJobDeploy: - // two models for deploy stage: - // the first model is the output model of train stage, which was evaluated as better than the second model in eval stage. - // the second model is the serving model used in the inference worker. - var deployModels []Model - models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input") - for _, m := range models { - deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL}) - } - - return deployModels + model = jobConfig.DeployModel + } + if model == nil { + return nil } - return nil + return getModel(model, models) } // triggerTrainTask triggers the train task @@ -752,16 +812,11 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { job.JobConfig.Rounds++ var m *Model - - latestCondition := im.getLatestCondition(job) rounds := jobConfig.Rounds if rounds <= 1 { m = jobConfig.TrainModel } else { - models := im.getJobStageModel(job, latestCondition.Stage) - if models != nil { - m = &models[0] - } + m = im.getModelFromJobConditions(job, sednav1.ILJobTrain) } var dataIndexURL string @@ -803,19 +858,10 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig := job.JobConfig var err error - latestCondition := im.getLatestCondition(job) + m := im.getModelFromJobConditions(job, sednav1.ILJobEval) - ms := im.getJobStageModel(job, latestCondition.Stage) - if ms == nil { - return nil, err - } - - models := []Model{ms[0], { - Format: jobConfig.DeployModel.Format, - URL: jobConfig.DeployModel.URL, - }} - // EvalModels has two models, first is trained model, second is deployed model - jobConfig.EvalModels = models + var models []Model + models = append(models, *m, *jobConfig.EvalModel) var dataIndexURL string jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], @@ -825,6 +871,7 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) return nil, err } + jobConfig.DataSamples.EvalSamples = []string{} dataURL := jobConfig.EvalDataURL if jobConfig.Storage.IsLocalStorage { @@ -851,9 +898,9 @@ func (im *Manager) triggerDeployTask(job *Job) (bool, error) { jobConfig := job.JobConfig evalResult, err := im.getEvalResult(job) - if err != nil && len(evalResult) < 2 { - klog.Errorf("job(name=%s failed to get eval result(%v): %+w", job.Name, evalResult, err) - return false, err + // EvalResult must has two models info, first is trained model, second is deployed model. + if len(evalResult) != 2 { + return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult)) } newMetrics := evalResult[0] @@ -966,10 +1013,27 @@ func (im *Manager) loadTrainModel(job *Job) error { } jobConfig.TrainModel = new(Model) - format := initialModel.Spec.Format - url := initialModel.Spec.URL - jobConfig.TrainModel.Format = format - jobConfig.TrainModel.URL = url + jobConfig.TrainModel.Format = initialModel.Spec.Format + jobConfig.TrainModel.URL = initialModel.Spec.URL + jobConfig.TrainModel.Devices = initialModel.Spec.Devices + } + return nil +} + +// loadEvalModel loads initial model information for eval. +func (im *Manager) loadEvalModel(job *Job) error { + jobConfig := job.JobConfig + + if jobConfig.EvalModel == nil { + initialModel, err := im.getModel(job.Namespace, job.Spec.EvalSpec.InitialModel.Name) + if err != nil { + return err + } + + jobConfig.EvalModel = new(Model) + jobConfig.EvalModel.Format = initialModel.Spec.Format + jobConfig.EvalModel.URL = initialModel.Spec.URL + jobConfig.EvalModel.Devices = initialModel.Spec.Devices } return nil } @@ -979,14 +1043,15 @@ func (im *Manager) loadDeployModel(job *Job) error { jobConfig := job.JobConfig if jobConfig.DeployModel == nil { - evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name) + deployModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name) if err != nil { return err } jobConfig.DeployModel = new(Model) - jobConfig.DeployModel.Format = evalModel.Spec.Format - jobConfig.DeployModel.URL = evalModel.Spec.URL + jobConfig.DeployModel.Format = deployModel.Spec.Format + jobConfig.DeployModel.URL = deployModel.Spec.URL + jobConfig.DeployModel.Devices = deployModel.Spec.Devices } return nil }