diff --git a/controllers/flinkcluster_updater.go b/controllers/flinkcluster_updater.go index 34a9e32d..200162e4 100644 --- a/controllers/flinkcluster_updater.go +++ b/controllers/flinkcluster_updater.go @@ -389,25 +389,26 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus( } // (Optional) Job. - var jobStopped = false var jobStatus = updater.getJobStatus() status.Components.Job = jobStatus - if jobStatus != nil && - (jobStatus.State == v1beta1.JobStateSucceeded || - jobStatus.State == v1beta1.JobStateFailed || - jobStatus.State == v1beta1.JobStateCancelled || - jobStatus.State == v1beta1.JobStateSuspended) { - jobStopped = true - } // Derive the new cluster state. switch recorded.State { case "", v1beta1.ClusterStateCreating: if runningComponents < totalComponents { - if runningComponents == 0 { - status.State = v1beta1.ClusterStateStopped - } else { - status.State = v1beta1.ClusterStateCreating + status.State = v1beta1.ClusterStateCreating + if isJobStopped(jobStatus) { + var policy = observed.cluster.Spec.Job.CleanupPolicy + if jobStatus.State == v1beta1.JobStateSucceeded && + policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster { + status.State = v1beta1.ClusterStateStopping + } else if jobStatus.State == v1beta1.JobStateFailed && + policy.AfterJobFails != v1beta1.CleanupActionKeepCluster { + status.State = v1beta1.ClusterStateStopping + } else if jobStatus.State == v1beta1.JobStateCancelled && + policy.AfterJobCancelled != v1beta1.CleanupActionKeepCluster { + status.State = v1beta1.ClusterStateStopping + } } } else { status.State = v1beta1.ClusterStateRunning @@ -428,7 +429,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus( v1beta1.ClusterStateReconciling: if isClusterUpdating { status.State = v1beta1.ClusterStateUpdating - } else if jobStopped { + } else if isJobStopped(jobStatus) { var policy = observed.cluster.Spec.Job.CleanupPolicy if jobStatus.State == v1beta1.JobStateSucceeded && policy.AfterJobSucceeds != v1beta1.CleanupActionKeepCluster {