diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index 035a3ad..46a8751 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -28,6 +28,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + pruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -182,7 +184,13 @@ func NewMPIJobController( mpiJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueMPIJob, UpdateFunc: func(old, new interface{}) { - controller.enqueueMPIJob(new) + job, err := MPIJobFromUnstructured(old) + if err != nil { + return + } + if !(job.Status.LauncherStatus == kubeflow.LauncherSucceeded || job.Status.LauncherStatus == kubeflow.LauncherFailed) || job.DeletionTimestamp != nil { + controller.enqueueMPIJob(new) + } }, }) @@ -1123,3 +1131,17 @@ func newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryImage string) *batchv1. }, } } + +func MPIJobFromUnstructured(obj interface{}) (*kubeflow.MPIJob, error) { + un, ok := obj.(*metav1unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("failed to get MPIJob from key") + } + var job kubeflow.MPIJob + err := pruntime.DefaultUnstructuredConverter.FromUnstructured(un.Object, &job) + if err != nil { + return nil, fmt.Errorf("failed to marshal the object to MPIJob") + } + + return &job, nil +}