Skip to content

Commit

Permalink
IL: LC supports to recover job when restart
Browse files Browse the repository at this point in the history
Signed-off-by: JimmyYang20 <[email protected]>
JimmyYang20 committed Aug 13, 2021
1 parent 626a892 commit bf39a7c
Showing 2 changed files with 332 additions and 209 deletions.
13 changes: 13 additions & 0 deletions pkg/localcontroller/db/db.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package db

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

@@ -77,6 +78,18 @@ func SaveResource(name string, typeMeta, objectMeta, spec interface{}) error {
return nil
}

// GetResource gets resource info in db
func GetResource(name string) (*Resource, error) {
r := Resource{}

queryResult := dbClient.Where("name = ?", name).First(&r)
if queryResult.RowsAffected == 0 {
return nil, fmt.Errorf("resource(name=%s) not in db", name)
}

return &r, nil
}

// DeleteResource deletes resource info in db
func DeleteResource(name string) error {
var err error
Original file line number Diff line number Diff line change
@@ -41,15 +41,13 @@ import (
"github.com/kubeedge/sedna/pkg/localcontroller/trigger"
"github.com/kubeedge/sedna/pkg/localcontroller/util"
workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// IncrementalLearningJob defines config for incremental-learning-job
type Job struct {
sednav1.IncrementalLearningJob
JobConfig *JobConfig
Dataset *dataset.Dataset
Storage storage.Storage
Done chan struct{}
}

// JobConfig defines config for incremental-learning-job
@@ -71,6 +69,9 @@ type JobConfig struct {
EvalModels []Model
EvalResult []Model
Lock sync.Mutex
Dataset *dataset.Dataset
Storage storage.Storage
Done chan struct{}
}

type Model = clienttypes.Model
@@ -84,7 +85,7 @@ type OutputConfig struct {

// DataSamples defines samples information
type DataSamples struct {
Numbers int
PreviousNumbers int
TrainSamples []string
EvalVersionSamples [][]string
EvalSamples []string
@@ -114,6 +115,10 @@ const (
TriggerReadyStatus = "ready"
// TriggerCompletedStatus is the completed status about trigger
TriggerCompletedStatus = "completed"

AnnotationsRoundsKey = "sedna.io/rounds"
AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"
)

// New creates a incremental-learning-job manager
@@ -139,53 +144,66 @@ func (im *Manager) Start() error {
}

// trainTask starts training task
func (im *Manager) trainTask(job *Job, currentRound int) error {
func (im *Manager) trainTask(job *Job) error {
jobConfig := job.JobConfig

latestCond := im.getLatestCondition(job)
jobStage := latestCond.Stage
currentType := latestCond.Type

if currentType == sednav1.ILJobStageCondWaiting {
if job.Dataset == nil {
return fmt.Errorf("job(name=%s) dataset not ready", jobConfig.UniqueIdentifier)
}

err := im.loadTrainModel(job)
if err != nil {
return fmt.Errorf("failed to sync train model, and waiting it: %v", err)
}
var err error

if currentRound < jobConfig.Rounds {
currentRound = jobConfig.Rounds
initTriggerStatus(jobConfig)
err = im.loadDataset(job)
if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
jobConfig.UniqueIdentifier, err)
}
}

if currentType == sednav1.ILJobStageCondWaiting && jobConfig.TrainTriggerStatus == TriggerReadyStatus {
payload, ok, err := im.triggerTrainTask(job)
if !ok {
return nil
if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier)
}

err = im.loadTrainModel(job)
if err != nil {
klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
jobConfig.UniqueIdentifier, jobStage, err)
return err
return fmt.Errorf("failed to sync train model, and waiting it: %w", err)
}

err = im.Client.WriteMessage(payload, job.getHeader())
if err != nil {
klog.Errorf("job(name=%s) failed to write message: %v",
jobConfig.UniqueIdentifier, err)
return err
}
initTriggerStatus(jobConfig)

if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
payload, ok, err := im.triggerTrainTask(job)
if !ok {
return nil
}

jobConfig.TrainTriggerStatus = TriggerCompletedStatus
jobConfig.Rounds++
forwardSamples(jobConfig, jobStage)
klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
if err != nil {
klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v",
jobConfig.UniqueIdentifier, jobStage, err)
job.JobConfig.Rounds--
return err
}

err = im.Client.WriteMessage(payload, job.getHeader())
if err != nil {
klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
job.JobConfig.Rounds--
return err
}

forwardSamples(jobConfig, jobStage)

err = im.saveJobToDB(job)
if err != nil {
klog.Errorf("job(%s) failed to save job to db: %v",
jobConfig.UniqueIdentifier, err)
// continue anyway
}

jobConfig.TrainTriggerStatus = TriggerCompletedStatus
klog.Infof("job(%s) completed the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
}
}

return nil
@@ -200,29 +218,39 @@ func (im *Manager) evalTask(job *Job) error {
currentType := latestCond.Type

if currentType == sednav1.ILJobStageCondWaiting {
err := im.loadDeployModel(job)
if err != nil {
return fmt.Errorf("failed to sync deploy model, and waiting it: %v", err)
}
}
var err error

if currentType == sednav1.ILJobStageCondWaiting && jobConfig.EvalTriggerStatus == TriggerReadyStatus {
payload, err := im.triggerEvalTask(job)
if err != nil {
klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
jobConfig.UniqueIdentifier, jobStage, err)
return err
err = im.loadDataset(job)
if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
jobConfig.UniqueIdentifier, err)
}

err = im.Client.WriteMessage(payload, job.getHeader())
err = im.loadDeployModel(job)
if err != nil {
return err
return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
}

jobConfig.EvalTriggerStatus = TriggerCompletedStatus
forwardSamples(jobConfig, jobStage)
klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
payload, err := im.triggerEvalTask(job)
if err != nil {
klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v",
jobConfig.UniqueIdentifier, jobStage, err)
return err
}

err = im.Client.WriteMessage(payload, job.getHeader())
if err != nil {
klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
return err
}

forwardSamples(jobConfig, jobStage)

jobConfig.TrainTriggerStatus = TriggerCompletedStatus
klog.Infof("job(%s) completed the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
}
}

return nil
@@ -236,84 +264,73 @@ func (im *Manager) deployTask(job *Job) {

neededDeploy, err = im.triggerDeployTask(job)
status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy)

if err == nil && neededDeploy {
deployModel, err := im.deployModel(job)
if err == nil && neededDeploy && models != nil {
trainedModel := models[0]
deployModel := models[1]
err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL)
if err != nil {
klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err)
} else {
klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier)
}
if err != nil || deployModel == nil {
status.Status = string(sednav1.ILJobStageCondFailed)
klog.Errorf("job(%s) failed to update model: %v", jobConfig.UniqueIdentifier, err)
} else {
status.Status = string(sednav1.ILJobStageCondReady)
status.Input = &clienttypes.Input{
Models: []Model{
*deployModel,
},
}
klog.Infof("job(%s) updated model successfully", jobConfig.UniqueIdentifier)
}

status.Input = &clienttypes.Input{
Models: models,
}

klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
klog.Infof("job(%s) completed the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
} else {
// No need to deploy, just report completed status
// TODO: instead of reporting deploy-completed, another more reasonable status
klog.Infof("no need to deploy model for job(name=%s)", jobConfig.UniqueIdentifier)
klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier)
status.Status = string(sednav1.ILJobStageCondCompleted)
}

err = im.Client.WriteMessage(status, job.getHeader())
if err != nil {
klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
klog.Errorf("job(%s) completed the %s task failed: %v",
jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
}

klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
}

// startJob starts a job
func (im *Manager) startJob(name string) {
var err error
job := im.IncrementalJobMap[name]

job.JobConfig = new(JobConfig)
jobConfig := job.JobConfig
jobConfig.UniqueIdentifier = name

err = im.initJob(job)
err = im.initJob(job, name)
if err != nil {
klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier)
klog.Errorf("failed to init job (name=%s): %+v", name)
return
}

klog.Infof("incremental job(name=%s) is started", name)
defer klog.Infof("incremental learning job(name=%s) is stopped", name)
klog.Infof("incremental job(%s) was started", name)
defer klog.Infof("incremental learning job(%s) was stopped", name)

cond := im.getLatestCondition(job)
currentType := cond.Type
jobStage := cond.Stage
if jobStage == sednav1.ILJobTrain && currentType == sednav1.ILJobStageCondWaiting {
go im.handleData(job)
}

currentRound := jobConfig.Rounds
// handle data from dataset
go im.handleData(job)

tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
for {
select {
case <-job.Done:
case <-job.JobConfig.Done:
return
default:
}

latestCond := im.getLatestCondition(job)
jobStage := latestCond.Stage
cond := im.getLatestCondition(job)
jobStage := cond.Stage

switch jobStage {
case sednav1.ILJobTrain:
err = im.trainTask(job, currentRound)
err = im.trainTask(job)
case sednav1.ILJobEval:
err = im.evalTask(job)
default:
@@ -322,8 +339,7 @@ func (im *Manager) startJob(name string) {
}

if err != nil {
klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
jobConfig.UniqueIdentifier, jobStage, err)
klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err)
}

<-tick.C
@@ -338,8 +354,6 @@ func (im *Manager) Insert(message *clienttypes.Message) error {
job, ok := im.IncrementalJobMap[name]
if !ok {
job = &Job{}
job.Storage = storage.Storage{IsLocalStorage: false}
job.Done = make(chan struct{})
im.IncrementalJobMap[name] = job
first = true
}
@@ -348,13 +362,6 @@ func (im *Manager) Insert(message *clienttypes.Message) error {
return err
}

credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
if credential != "" {
if err := job.Storage.SetCredential(credential); err != nil {
return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err)
}
}

if first {
go im.startJob(name)
}
@@ -370,8 +377,8 @@ func (im *Manager) Insert(message *clienttypes.Message) error {
func (im *Manager) Delete(message *clienttypes.Message) error {
name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)

if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil {
close(job.Done)
if job, ok := im.IncrementalJobMap[name]; ok && job.JobConfig.Done != nil {
close(job.JobConfig.Done)
}

delete(im.IncrementalJobMap, name)
@@ -383,13 +390,104 @@ func (im *Manager) Delete(message *clienttypes.Message) error {
return nil
}

// updateJobFromDB updates job from db
func (im *Manager) updateJobFromDB(job *Job) error {
var err error

previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier)
if err != nil {
return err
}

m := metav1.ObjectMeta{}
if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) {
return err
}

rounds, ok := m.Annotations[AnnotationsRoundsKey]
if !ok {
return nil
}

if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil {
return err
}

numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey]
if !ok {
return nil
}

if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil {
return err
}

dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey]
if !ok {
return nil
}

localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "")

if !job.JobConfig.Storage.IsLocalStorage {
defer os.RemoveAll(localURL)
}

if err != nil {
return err
}

samples, err := dataset.GetSamples(dataFileOfEval)
if err != nil {
klog.Errorf("read file %s failed: %v", dataFileOfEval, err)
return err
}

job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples)

return nil
}

// saveJobToDB saves job info to db
func (im *Manager) saveJobToDB(job *Job) error {
ann := job.ObjectMeta.Annotations
if ann == nil {
ann = make(map[string]string)
}

ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds)
ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers)
ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL

return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec)
}

// initJob inits the job object
func (im *Manager) initJob(job *Job) error {
func (im *Manager) initJob(job *Job, name string) error {
job.JobConfig = new(JobConfig)

jobConfig := job.JobConfig
jobConfig.UniqueIdentifier = name

jobConfig.Storage = storage.Storage{IsLocalStorage: false}
credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
if credential != "" {
if err := job.JobConfig.Storage.SetCredential(credential); err != nil {
return fmt.Errorf("failed to set storage credential: %w", err)
}
}

jobConfig.Done = make(chan struct{})
jobConfig.Lock = sync.Mutex{}
jobConfig.Rounds = 0

jobConfig.DataSamples = &DataSamples{
PreviousNumbers: 0,
TrainSamples: make([]string, 0),
EvalVersionSamples: make([][]string, 0),
EvalSamples: make([]string, 0),
}

jobConfig.Rounds = 1
initTriggerStatus(jobConfig)
trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
if err != nil {
return fmt.Errorf("failed to init train trigger: %+w", err)
@@ -403,13 +501,13 @@ func (im *Manager) initJob(job *Job) error {

outputDir := job.Spec.OutputDir

isLocalURL, err := job.Storage.IsLocalURL(outputDir)
isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir)
if err != nil {
return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir)
return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err)
}

if isLocalURL {
job.Storage.IsLocalStorage = true
jobConfig.Storage.IsLocalStorage = true
outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
}

@@ -419,6 +517,12 @@ func (im *Manager) initJob(job *Job) error {
return err
}

if err := im.updateJobFromDB(job); err != nil {
klog.Errorf("job(%s) failed to update job from db: %v", name, err)
}

initTriggerStatus(jobConfig)

return nil
}

@@ -442,42 +546,63 @@ func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
return trigger.NewTrigger(triggerMap)
}

// getTrainOrEvalModel gets train model or eval model from job conditions
func (im *Manager) getTrainOrEvalModel(job *Job, jobStage sednav1.ILJobStage) *Model {
// getModelFromJobConditions gets model from job conditions for train/eval/deploy
func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) []Model {
jobConditions := job.Status.Conditions

// TODO: runtime.type changes to common.type for gm and lc
var models []runtime.Model

for i := len(jobConditions) - 1; i >= 0; i-- {
var cond gmtypes.IncrementalCondData
jobCond := jobConditions[i]
if jobCond.Stage == sednav1.ILJobTrain && jobCond.Type == sednav1.ILJobStageCondCompleted {
if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
continue
}

if cond.Output == nil {
continue
getModels := func(stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []runtime.Model {
// TODO: runtime.type changes to common.type for gm and lc
for i := len(jobConditions) - 1; i >= 0; i-- {
var cond gmtypes.IncrementalCondData
jobCond := jobConditions[i]
if jobCond.Stage == stage && jobCond.Type == currentType {
if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
continue
}

if dataType == "input" {
if cond.Input == nil {
continue
}

return cond.Input.Models
} else if dataType == "output" {
if cond.Output == nil {
continue
}

return cond.Output.Models
}
}
// models list has two model, first is deploy model, second is trained model
models = cond.Output.Models

break
}
}

// models must have two model file info which are output of train,
// first model will be used for inference if it evaluated as excellent, second model will be used for retaining.
if len(models) != 2 {
return nil
}

switch jobStage {
case sednav1.ILJobTrain:
return &Model{Format: models[1].Format, URL: models[1].URL}
// the second model is the pre-trained model of train stage.
models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
if models != nil {
return []Model{{Format: models[1].Format, URL: models[1].URL}}
}
case sednav1.ILJobEval:
return &Model{Format: models[0].Format, URL: models[0].URL}
// the first model is the output model of train stage.
models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
if models != nil {
return []Model{{Format: models[0].Format, URL: models[0].URL}}
}
case sednav1.ILJobDeploy:
// two models for deploy stage:
// the first model is the output model of train stage, which was evaluated as better than the second model in eval stage.
// the second model is the serving model used in the inference worker.
var deployModels []Model
models := getModels(sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input")
for _, m := range models {
deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL})
}

return deployModels
}

return nil
@@ -499,31 +624,34 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
return nil, false, nil
}

job.JobConfig.Rounds++

var m *Model

latestCondition := im.getLatestCondition(job)
rounds := jobConfig.Rounds
if rounds <= 1 {
m = jobConfig.TrainModel
} else {
m = im.getTrainOrEvalModel(job, latestCondition.Stage)
if m == nil {
return nil, false, err
models := im.getModelFromJobConditions(job, latestCondition.Stage)
if models != nil {
m = &models[0]
}
}

var dataIndexURL string
jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
jobConfig.OutputConfig.SamplesOutput["train"], rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
if err != nil {
klog.Errorf("job(name=%s) train phase: write samples to the file(%s) is failed, error: %v",
job.JobConfig.Rounds--
klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v",
jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
return nil, false, err
}

dataURL := jobConfig.TrainDataURL
outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
if job.Storage.IsLocalStorage {
if jobConfig.Storage.IsLocalStorage {
dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
@@ -540,6 +668,7 @@ func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
Status: string(sednav1.ILJobStageCondReady),
Input: &input,
}

jobConfig.TriggerTime = time.Now()
return &msg, true, nil
}
@@ -551,30 +680,29 @@ func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro

latestCondition := im.getLatestCondition(job)

m := im.getTrainOrEvalModel(job, latestCondition.Stage)
if m == nil {
ms := im.getModelFromJobConditions(job, latestCondition.Stage)
if ms == nil {
return nil, err
}

models := []Model{*m, {
models := []Model{ms[0], {
Format: jobConfig.DeployModel.Format,
URL: jobConfig.DeployModel.URL,
}}
// EvalModels has two models, first is trained model, second is deployed model
jobConfig.EvalModels = models

var dataIndexURL string
rounds := jobConfig.Rounds
jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
if err != nil {
klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v",
jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
return nil, err
}

dataURL := jobConfig.EvalDataURL
if job.Storage.IsLocalStorage {
if jobConfig.Storage.IsLocalStorage {
dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
}
@@ -652,32 +780,19 @@ func (im *Manager) triggerDeployTask(job *Job) (bool, error) {
return jobConfig.DeployTrigger.Trigger(metricDelta), nil
}

// deployModel deploys model
func (im *Manager) deployModel(job *Job) (*Model, error) {
jobConfig := job.JobConfig

trainedModel := jobConfig.EvalModels[0].URL
deployModel := jobConfig.EvalModels[1].URL
if job.Storage.IsLocalStorage {
// updateDeployModelFile updates deploy model file
func (im *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error {
if job.JobConfig.Storage.IsLocalStorage {
trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
}

if err := job.updateDeployModel(deployModel, trainedModel); err != nil {
return nil, err
if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil {
return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w",
trainedModel, deployModel, err)
}

klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, trainedModel)

return &jobConfig.EvalModels[0], nil
}

func (job *Job) updateDeployModel(deployModel string, newModel string) error {
if err := job.Storage.CopyFile(newModel, deployModel); err != nil {
return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v",
newModel, deployModel, err)
}
klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel)

klog.Infof("copy model(url=%s) to the deploy model(url=%s) successfully", newModel, deployModel)
return nil
}

@@ -687,16 +802,16 @@ func (job *Job) createOutputDir(jobConfig *JobConfig) error {

dirNames := []string{"data/train", "data/eval", "train", "eval"}

if job.Storage.IsLocalStorage {
if job.JobConfig.Storage.IsLocalStorage {
if err := util.CreateFolder(outputDir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err)
return err
}

for _, v := range dirNames {
dir := path.Join(outputDir, v)
if err := util.CreateFolder(dir); err != nil {
klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err)
return err
}
}
@@ -772,7 +887,7 @@ func (im *Manager) loadDeployModel(job *Job) error {

// loadDataset loads dataset information
func (im *Manager) loadDataset(job *Job) error {
if job.Dataset != nil {
if job.JobConfig.Dataset != nil {
// already loaded
return nil
}
@@ -783,15 +898,7 @@ func (im *Manager) loadDataset(job *Job) error {
return fmt.Errorf("not exists dataset(name=%s)", datasetName)
}

jobConfig := job.JobConfig
jobConfig.DataSamples = &DataSamples{
Numbers: 0,
TrainSamples: make([]string, 0),
EvalVersionSamples: make([][]string, 0),
EvalSamples: make([]string, 0),
}

job.Dataset = dataset
job.JobConfig.Dataset = dataset
return nil
}

@@ -803,49 +910,47 @@ func (im *Manager) handleData(job *Job) {
iterCount := 0
for {
select {
case <-job.Done:
case <-jobConfig.Done:
return
default:
}

// in case dataset is not synced to LC before job synced to LC
// here call loadDataset in each period
err := im.loadDataset(job)
if iterCount%100 == 0 {
klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier)
}
iterCount++
if err != nil {
klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
jobConfig.UniqueIdentifier,
err)

if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
// already loaded dataset
<-tick.C
continue
}

dataset := job.Dataset
dataset := jobConfig.Dataset
currentNumberOfSamples := dataset.DataSource.NumberOfSamples
previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers

if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers {
if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples {
samples := dataset.DataSource.TrainSamples
trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers))
newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples
trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples))

jobConfig.Lock.Lock()
jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...)
klog.Infof("job(name=%s) current train samples nums is %d",
jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples))
samples[(previousNumberOfSamples+1):(previousNumberOfSamples+trainNum+1)]...)
klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum)

jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
samples[(jobConfig.DataSamples.Numbers+trainNum+1):])
samples[(previousNumberOfSamples+trainNum+1):])
jobConfig.Lock.Unlock()

for _, v := range jobConfig.DataSamples.EvalVersionSamples {
jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
}
klog.Infof("job(name=%s) current eval samples nums is %d",
jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
evalNum := newNumberOfSamples - trainNum
klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, evalNum)

jobConfig.DataSamples.Numbers = len(samples)
jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples
}

<-tick.C
@@ -866,18 +971,23 @@ func createFile(dir string, format string, isLocalStorage bool) (string, string)

// writeSamples writes samples information to a file
func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
if samples == nil {
return "", "", fmt.Errorf("not samples")
}

jobConfig := job.JobConfig
subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage)
fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage)

if job.Storage.IsLocalStorage {
if jobConfig.Storage.IsLocalStorage {
if err := util.CreateFolder(subDir); err != nil {
return "", "", err
}
if err := im.writeByLine(samples, fileURL); err != nil {
return "", "", err
}

if !job.Dataset.Storage.IsLocalStorage {
if !jobConfig.Dataset.Storage.IsLocalStorage {
tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
if err := im.writeByLine(tempSamples, absURLFile); err != nil {
return "", "", err
@@ -892,13 +1002,13 @@ func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds i
return "", "", err
}

localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage)
localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage)

if err := im.writeByLine(samples, localFileURL); err != nil {
return "", "", err
}

if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil {
return "", "", err
}

@@ -908,7 +1018,7 @@ func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds i
return "", "", err
}

if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
return "", "", err
}

@@ -931,12 +1041,12 @@ func (im *Manager) writeByLine(samples []string, fileURL string) error {
_, _ = fmt.Fprintln(w, line)
}
if err := w.Flush(); err != nil {
klog.Errorf("write file(%s) failed", fileURL)
klog.Errorf("failed to write file(%s): %v", fileURL, err)
return err
}

if err := file.Close(); err != nil {
klog.Errorf("close file failed, error: %v", err)
klog.Errorf("failed to close file(%s): %v", fileURL, err)
return err
}

@@ -972,7 +1082,7 @@ func (im *Manager) monitorWorker() {
}

if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil {
klog.Errorf("job(name=%s) failed to write message: %v", name, err)
klog.Errorf("job(%s) failed to write message: %v", name, err)
continue
}

@@ -987,7 +1097,7 @@ func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.Messa
workerKind := strings.ToLower(workerMessage.Kind)

if jobStage != workerKind {
klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
klog.Warningf("job(%s)'s %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
jobStage, workerKind)
return
}
@@ -1021,11 +1131,11 @@ func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.Messa
jobName := job.JobConfig.UniqueIdentifier

if workerStatus == workertypes.CompletedStatus {
klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage)
klog.Infof("job(%s) completed the %s task successfully", jobName, jobStage)
switch latestCond.Stage {
case sednav1.ILJobEval:
job.JobConfig.EvalResult = models
// when eval worker is complete, the deploy task starts immediately without waiting for the notification of GM.
// when eval worker is completed status, the deploy task will starts immediately without waiting for the notification of GM.
im.deployTask(job)
}
}

0 comments on commit bf39a7c

Please sign in to comment.