From 2506b9658f9bdfe1a54f21a809539687a8cd9244 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 12 Aug 2023 11:48:59 -0700 Subject: [PATCH 01/11] Transition to Queue if the pytorch JobCondition is empty Signed-off-by: Kevin Su --- .../plugins/k8s/kfoperators/common/common_operator.go | 8 +++++--- .../k8s/kfoperators/common/common_operator_test.go | 9 ++++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index 3a86ed9a5..b79547a29 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -45,7 +45,7 @@ func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp } } - return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions) + return commonOp.JobCondition{}, nil } // ExtractCurrentCondition will return the first job condition for tensorflow/pytorch @@ -61,13 +61,15 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo } } } - - return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions) + return commonOp.JobCondition{}, nil } // GetPhaseInfo will return the phase of kubeflow job func GetPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Time, taskPhaseInfo pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) { + if len(currentCondition.Type) == 0 { + return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil + } switch currentCondition.Type { case commonOp.JobCreated: return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index 8914e976f..933fbf5f8 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -67,10 +67,17 @@ func TestExtractCurrentCondition(t *testing.T) { } func TestGetPhaseInfo(t *testing.T) { + jobCreating := commonOp.JobCondition{} + taskPhase, err := GetPhaseInfo(jobCreating, time.Now(), pluginsCore.TaskInfo{}) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase()) + assert.NotNil(t, taskPhase.Info()) + assert.Nil(t, err) + jobCreated := commonOp.JobCondition{ Type: commonOp.JobCreated, } - taskPhase, err := GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{}) + taskPhase, err = GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{}) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase()) assert.NotNil(t, taskPhase.Info()) From 0d2aa97843d6e9fac4e22baa13942e90c82e1705 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 12 Aug 2023 11:55:57 -0700 Subject: [PATCH 02/11] nit Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/common/common_operator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index b79547a29..32564946b 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -60,6 +60,7 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo return jc, nil } } + return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions) } return commonOp.JobCondition{}, nil } From fff66323d4ad907f928935ae75d09f8446894d02 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 12 Aug 2023 12:16:55 -0700 Subject: [PATCH 03/11] nit Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/common/common_operator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index 32564946b..f4b1224bd 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -43,6 +43,7 @@ func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp return jc, nil } } + return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions) } return commonOp.JobCondition{}, nil From c7be349a2bf0f25c1365fd516b5a986f14b26cfb Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 12 Aug 2023 12:24:02 -0700 Subject: [PATCH 04/11] fix tests Signed-off-by: Kevin Su --- .../k8s/kfoperators/common/common_operator.go | 18 ----------- .../common/common_operator_test.go | 32 ++++++------------- go/tasks/plugins/k8s/kfoperators/mpi/mpi.go | 3 +- 3 files changed, 10 insertions(+), 43 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index f4b1224bd..acaf09680 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -31,24 +31,6 @@ type ReplicaEntry struct { RestartPolicy commonOp.RestartPolicy } -// ExtractMPICurrentCondition will return the first job condition for MPI -func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) { - if jobConditions != nil { - sort.Slice(jobConditions, func(i, j int) bool { - return jobConditions[i].LastTransitionTime.Time.After(jobConditions[j].LastTransitionTime.Time) - }) - - for _, jc := range jobConditions { - if jc.Status == v1.ConditionTrue { - return jc, nil - } - } - return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions) - } - - return commonOp.JobCondition{}, nil -} - // ExtractCurrentCondition will return the first job condition for tensorflow/pytorch func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) { if jobConditions != nil { diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index 933fbf5f8..dc844b866 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -18,7 +18,7 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestExtractMPICurrentCondition(t *testing.T) { +func TestExtractCurrentCondition(t *testing.T) { jobCreated := commonOp.JobCondition{ Type: commonOp.JobCreated, Status: corev1.ConditionTrue, @@ -31,35 +31,21 @@ func TestExtractMPICurrentCondition(t *testing.T) { jobCreated, jobRunningActive, } - currentCondition, err := ExtractMPICurrentCondition(jobConditions) + currentCondition, err := ExtractCurrentCondition(jobConditions) assert.NoError(t, err) assert.Equal(t, currentCondition, jobCreated) jobConditions = nil - currentCondition, err = ExtractMPICurrentCondition(jobConditions) - assert.Error(t, err) + currentCondition, err = ExtractCurrentCondition(jobConditions) assert.Equal(t, currentCondition, commonOp.JobCondition{}) - assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)) -} -func TestExtractCurrentCondition(t *testing.T) { - jobCreated := commonOp.JobCondition{ - Type: commonOp.JobCreated, - Status: corev1.ConditionTrue, - } - jobRunningActive := commonOp.JobCondition{ - Type: commonOp.JobRunning, - Status: corev1.ConditionFalse, - } - jobConditions := []commonOp.JobCondition{ - jobCreated, - jobRunningActive, - } - currentCondition, err := ExtractCurrentCondition(jobConditions) - assert.NoError(t, err) - assert.Equal(t, currentCondition, jobCreated) + jobCreating := commonOp.JobCondition{} + jobConditions = []commonOp.JobCondition{jobCreating} + currentCondition, err = ExtractCurrentCondition(jobConditions) + assert.Equal(t, currentCondition, commonOp.JobCondition{}) - jobConditions = nil + jobUnknown := commonOp.JobCondition{Type: "unknown"} + jobConditions = []commonOp.JobCondition{jobUnknown} currentCondition, err = ExtractCurrentCondition(jobConditions) assert.Error(t, err) assert.Equal(t, currentCondition, commonOp.JobCondition{}) diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go index 92c864340..7123dff52 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go @@ -209,7 +209,7 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext if err != nil { return pluginsCore.PhaseInfoUndefined, err } - currentCondition, err := common.ExtractMPICurrentCondition(app.Status.Conditions) + currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { return pluginsCore.PhaseInfoUndefined, err } @@ -223,7 +223,6 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext } return common.GetMPIPhaseInfo(currentCondition, occurredAt, taskPhaseInfo) - } func init() { From c97c418566769057fbba04133e4a7a32880bc559 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 12 Aug 2023 12:39:55 -0700 Subject: [PATCH 05/11] lint Signed-off-by: Kevin Su --- .../plugins/k8s/kfoperators/common/common_operator_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index dc844b866..7f74e3f58 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -37,11 +37,11 @@ func TestExtractCurrentCondition(t *testing.T) { jobConditions = nil currentCondition, err = ExtractCurrentCondition(jobConditions) + assert.NoError(t, err) assert.Equal(t, currentCondition, commonOp.JobCondition{}) - jobCreating := commonOp.JobCondition{} - jobConditions = []commonOp.JobCondition{jobCreating} - currentCondition, err = ExtractCurrentCondition(jobConditions) + currentCondition, err = ExtractCurrentCondition(nil) + assert.NoError(t, err) assert.Equal(t, currentCondition, commonOp.JobCondition{}) jobUnknown := commonOp.JobCondition{Type: "unknown"} From 8180cda50ed502ca2e36c6c02b2dc045ec138691 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 21 Aug 2023 17:33:10 -0700 Subject: [PATCH 06/11] add timeout Signed-off-by: Kevin Su --- .../plugins/k8s/kfoperators/pytorch/config.go | 31 +++++ .../k8s/kfoperators/pytorch/config_flags.go | 55 +++++++++ .../kfoperators/pytorch/config_flags_test.go | 116 ++++++++++++++++++ .../k8s/kfoperators/pytorch/pytorch.go | 3 + 4 files changed, 205 insertions(+) create mode 100644 go/tasks/plugins/k8s/kfoperators/pytorch/config.go create mode 100755 go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go create mode 100755 go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go new file mode 100644 index 000000000..70d1875f0 --- /dev/null +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go @@ -0,0 +1,31 @@ +package pytorch + +import ( + pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config" + "github.com/flyteorg/flytestdlib/config" + "time" +) + +//go:generate pflags Config --default-var=defaultConfig + +var ( + defaultConfig = Config{ + Timeout: config.Duration{Duration: 1 * time.Minute}, + } + + configSection = pluginsConfig.MustRegisterSubSection("pytorch", &defaultConfig) +) + +// Config is config for 'ray' plugin +type Config struct { + // If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed. + Timeout config.Duration `json:"timeout,omitempty"` +} + +func GetConfig() *Config { + return configSection.GetConfig().(*Config) +} + +func SetConfig(cfg *Config) error { + return configSection.SetConfig(cfg) +} diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go b/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go new file mode 100755 index 000000000..4a7e6fbf5 --- /dev/null +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go @@ -0,0 +1,55 @@ +// Code generated by go generate; DO NOT EDIT. +// This file was generated by robots. + +package pytorch + +import ( + "encoding/json" + "reflect" + + "fmt" + + "github.com/spf13/pflag" +) + +// If v is a pointer, it will get its element value or the zero value of the element type. +// If v is not a pointer, it will return it as is. +func (Config) elemValueOrNil(v interface{}) interface{} { + if t := reflect.TypeOf(v); t.Kind() == reflect.Ptr { + if reflect.ValueOf(v).IsNil() { + return reflect.Zero(t.Elem()).Interface() + } else { + return reflect.ValueOf(v).Interface() + } + } else if v == nil { + return reflect.Zero(t).Interface() + } + + return v +} + +func (Config) mustJsonMarshal(v interface{}) string { + raw, err := json.Marshal(v) + if err != nil { + panic(err) + } + + return string(raw) +} + +func (Config) mustMarshalJSON(v json.Marshaler) string { + raw, err := v.MarshalJSON() + if err != nil { + panic(err) + } + + return string(raw) +} + +// GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the +// flags is json-name.json-sub-name... etc. +func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { + cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "timeout"), defaultConfig.Timeout.String(), "") + return cmdFlags +} diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go b/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go new file mode 100755 index 000000000..9a305cd53 --- /dev/null +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go @@ -0,0 +1,116 @@ +// Code generated by go generate; DO NOT EDIT. +// This file was generated by robots. + +package pytorch + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + "testing" + + "github.com/mitchellh/mapstructure" + "github.com/stretchr/testify/assert" +) + +var dereferencableKindsConfig = map[reflect.Kind]struct{}{ + reflect.Array: {}, reflect.Chan: {}, reflect.Map: {}, reflect.Ptr: {}, reflect.Slice: {}, +} + +// Checks if t is a kind that can be dereferenced to get its underlying type. +func canGetElementConfig(t reflect.Kind) bool { + _, exists := dereferencableKindsConfig[t] + return exists +} + +// This decoder hook tests types for json unmarshaling capability. If implemented, it uses json unmarshal to build the +// object. Otherwise, it'll just pass on the original data. +func jsonUnmarshalerHookConfig(_, to reflect.Type, data interface{}) (interface{}, error) { + unmarshalerType := reflect.TypeOf((*json.Unmarshaler)(nil)).Elem() + if to.Implements(unmarshalerType) || reflect.PtrTo(to).Implements(unmarshalerType) || + (canGetElementConfig(to.Kind()) && to.Elem().Implements(unmarshalerType)) { + + raw, err := json.Marshal(data) + if err != nil { + fmt.Printf("Failed to marshal Data: %v. Error: %v. Skipping jsonUnmarshalHook", data, err) + return data, nil + } + + res := reflect.New(to).Interface() + err = json.Unmarshal(raw, &res) + if err != nil { + fmt.Printf("Failed to umarshal Data: %v. Error: %v. Skipping jsonUnmarshalHook", data, err) + return data, nil + } + + return res, nil + } + + return data, nil +} + +func decode_Config(input, result interface{}) error { + config := &mapstructure.DecoderConfig{ + TagName: "json", + WeaklyTypedInput: true, + Result: result, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + jsonUnmarshalerHookConfig, + ), + } + + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return err + } + + return decoder.Decode(input) +} + +func join_Config(arr interface{}, sep string) string { + listValue := reflect.ValueOf(arr) + strs := make([]string, 0, listValue.Len()) + for i := 0; i < listValue.Len(); i++ { + strs = append(strs, fmt.Sprintf("%v", listValue.Index(i))) + } + + return strings.Join(strs, sep) +} + +func testDecodeJson_Config(t *testing.T, val, result interface{}) { + assert.NoError(t, decode_Config(val, result)) +} + +func testDecodeRaw_Config(t *testing.T, vStringSlice, result interface{}) { + assert.NoError(t, decode_Config(vStringSlice, result)) +} + +func TestConfig_GetPFlagSet(t *testing.T) { + val := Config{} + cmdFlags := val.GetPFlagSet("") + assert.True(t, cmdFlags.HasFlags()) +} + +func TestConfig_SetFlags(t *testing.T) { + actual := Config{} + cmdFlags := actual.GetPFlagSet("") + assert.True(t, cmdFlags.HasFlags()) + + t.Run("Test_timeout", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := defaultConfig.Timeout.String() + + cmdFlags.Set("timeout", testValue) + if vString, err := cmdFlags.GetString("timeout"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.Timeout) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) +} diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go index 8652dc7d3..1221f4d53 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go @@ -231,6 +231,9 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont return pluginsCore.PhaseInfoUndefined, err } + if app.Status.StartTime == nil && app.CreationTimestamp.Add(GetConfig().Timeout.Duration).Before(time.Now()) { + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch CR since creation time %v", app.CreationTimestamp) + } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { return pluginsCore.PhaseInfoUndefined, err From 3acb2b8d8e0205b7ac97651652d4ab63c93b7c04 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 22 Aug 2023 10:14:06 -0700 Subject: [PATCH 07/11] updated comment Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/pytorch/config.go | 2 +- go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go index 70d1875f0..cfffc9744 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go @@ -16,7 +16,7 @@ var ( configSection = pluginsConfig.MustRegisterSubSection("pytorch", &defaultConfig) ) -// Config is config for 'ray' plugin +// Config is config for 'pytorch' plugin type Config struct { // If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed. Timeout config.Duration `json:"timeout,omitempty"` diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go index 1221f4d53..82a9490a4 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go @@ -232,7 +232,7 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont } if app.Status.StartTime == nil && app.CreationTimestamp.Add(GetConfig().Timeout.Duration).Before(time.Now()) { - return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch CR since creation time %v", app.CreationTimestamp) + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch coustum resource since creation time %v", app.CreationTimestamp) } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { From a53c761c8db158f31a26002e33db94540449f7f3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 22 Aug 2023 10:55:25 -0700 Subject: [PATCH 08/11] fixed tests Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/pytorch/config.go | 3 ++- go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go index cfffc9744..18abe7920 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/config.go @@ -1,9 +1,10 @@ package pytorch import ( + "time" + pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config" "github.com/flyteorg/flytestdlib/config" - "time" ) //go:generate pflags Config --default-var=defaultConfig diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index 8122a85e3..69b2cb556 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -293,8 +293,9 @@ func dummyPytorchJobResource(pytorchResourceHandler pytorchOperatorResourceHandl return &kubeflowv1.PyTorchJob{ ObjectMeta: v1.ObjectMeta{ - Name: jobName, - Namespace: jobNamespace, + CreationTimestamp: v1.Time{Time: time.Now()}, + Name: jobName, + Namespace: jobNamespace, }, Spec: resource.(*kubeflowv1.PyTorchJob).Spec, Status: commonOp.JobStatus{ From eeb9f2bd2594f0cab5eef08a3125f56c699e6f74 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 29 Aug 2023 12:48:29 -0700 Subject: [PATCH 09/11] move config to common Signed-off-by: Kevin Su --- .../plugins/k8s/kfoperators/{pytorch => common}/config.go | 2 +- .../k8s/kfoperators/{pytorch => common}/config_flags.go | 2 +- .../k8s/kfoperators/{pytorch => common}/config_flags_test.go | 2 +- go/tasks/plugins/k8s/kfoperators/mpi/mpi.go | 3 +++ go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go | 2 +- go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go | 2 +- go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go | 4 ++++ .../plugins/k8s/kfoperators/tensorflow/tensorflow_test.go | 2 +- 8 files changed, 13 insertions(+), 6 deletions(-) rename go/tasks/plugins/k8s/kfoperators/{pytorch => common}/config.go (97%) rename go/tasks/plugins/k8s/kfoperators/{pytorch => common}/config_flags.go (98%) rename go/tasks/plugins/k8s/kfoperators/{pytorch => common}/config_flags_test.go (99%) diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go b/go/tasks/plugins/k8s/kfoperators/common/config.go similarity index 97% rename from go/tasks/plugins/k8s/kfoperators/pytorch/config.go rename to go/tasks/plugins/k8s/kfoperators/common/config.go index 18abe7920..61a52f3d1 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/config.go +++ b/go/tasks/plugins/k8s/kfoperators/common/config.go @@ -1,4 +1,4 @@ -package pytorch +package common import ( "time" diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go b/go/tasks/plugins/k8s/kfoperators/common/config_flags.go similarity index 98% rename from go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go rename to go/tasks/plugins/k8s/kfoperators/common/config_flags.go index 4a7e6fbf5..9fb5c0297 100755 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags.go +++ b/go/tasks/plugins/k8s/kfoperators/common/config_flags.go @@ -1,7 +1,7 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots. -package pytorch +package common import ( "encoding/json" diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go b/go/tasks/plugins/k8s/kfoperators/common/config_flags_test.go similarity index 99% rename from go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go rename to go/tasks/plugins/k8s/kfoperators/common/config_flags_test.go index 9a305cd53..0afdf456c 100755 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/config_flags_test.go +++ b/go/tasks/plugins/k8s/kfoperators/common/config_flags_test.go @@ -1,7 +1,7 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots. -package pytorch +package common import ( "encoding/json" diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go index 7123dff52..f792d9bc6 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go @@ -209,6 +209,9 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext if err != nil { return pluginsCore.PhaseInfoUndefined, err } + if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi coustum resource since creation time %v", app.CreationTimestamp) + } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { return pluginsCore.PhaseInfoUndefined, err diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index fbced8085..4a442b29a 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -281,7 +281,7 @@ func dummyMPIJobResource(mpiResourceHandler mpiOperatorResourceHandler, Status: mpiOp.JobStatus{ Conditions: jobConditions, ReplicaStatuses: nil, - StartTime: nil, + StartTime: &v1.Time{Time: time.Now()}, CompletionTime: nil, LastReconcileTime: nil, }, diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go index 82a9490a4..f25712db6 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go @@ -231,7 +231,7 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont return pluginsCore.PhaseInfoUndefined, err } - if app.Status.StartTime == nil && app.CreationTimestamp.Add(GetConfig().Timeout.Duration).Before(time.Now()) { + if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch coustum resource since creation time %v", app.CreationTimestamp) } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) diff --git a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go index 5e2ed948c..6e7a03145 100644 --- a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go +++ b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go @@ -209,6 +209,10 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC return pluginsCore.PhaseInfoUndefined, err } + if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow coustum resource since creation time %v", app.CreationTimestamp) + } + currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { return pluginsCore.PhaseInfoUndefined, err diff --git a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index 254d3efc1..5ec5658d8 100644 --- a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -283,7 +283,7 @@ func dummyTensorFlowJobResource(tensorflowResourceHandler tensorflowOperatorReso Status: commonOp.JobStatus{ Conditions: jobConditions, ReplicaStatuses: nil, - StartTime: nil, + StartTime: &v1.Time{Time: time.Now()}, CompletionTime: nil, LastReconcileTime: nil, }, From 0a0e00d76ce977619b5fcdb0bd9a712097842e17 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 30 Aug 2023 12:12:17 -0700 Subject: [PATCH 10/11] update config name Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/common/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/k8s/kfoperators/common/config.go b/go/tasks/plugins/k8s/kfoperators/common/config.go index 61a52f3d1..cac5cd304 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/config.go +++ b/go/tasks/plugins/k8s/kfoperators/common/config.go @@ -14,7 +14,7 @@ var ( Timeout: config.Duration{Duration: 1 * time.Minute}, } - configSection = pluginsConfig.MustRegisterSubSection("pytorch", &defaultConfig) + configSection = pluginsConfig.MustRegisterSubSection("common", &defaultConfig) ) // Config is config for 'pytorch' plugin From d5e4b59952483076e84f451cdd30c52b83829b61 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 31 Aug 2023 10:27:44 -0700 Subject: [PATCH 11/11] update config Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/kfoperators/common/config.go | 2 +- go/tasks/plugins/k8s/kfoperators/mpi/mpi.go | 2 +- go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go | 2 +- go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/k8s/kfoperators/common/config.go b/go/tasks/plugins/k8s/kfoperators/common/config.go index cac5cd304..7803e9097 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/config.go +++ b/go/tasks/plugins/k8s/kfoperators/common/config.go @@ -14,7 +14,7 @@ var ( Timeout: config.Duration{Duration: 1 * time.Minute}, } - configSection = pluginsConfig.MustRegisterSubSection("common", &defaultConfig) + configSection = pluginsConfig.MustRegisterSubSection("kf-operator", &defaultConfig) ) // Config is config for 'pytorch' plugin diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go index f792d9bc6..7b6974ef8 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go @@ -210,7 +210,7 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext return pluginsCore.PhaseInfoUndefined, err } if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { - return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi coustum resource since creation time %v", app.CreationTimestamp) + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi custom resource since creation time %v", app.CreationTimestamp) } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go index f25712db6..d6f18b1a3 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go @@ -232,7 +232,7 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont } if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { - return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch coustum resource since creation time %v", app.CreationTimestamp) + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp) } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions) if err != nil { diff --git a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go index 6e7a03145..fb006935b 100644 --- a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go +++ b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go @@ -210,7 +210,7 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC } if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) { - return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow coustum resource since creation time %v", app.CreationTimestamp) + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow custom resource since creation time %v", app.CreationTimestamp) } currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)