Skip to content

Commit

Permalink
Remove usage of embeded fields (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jul 20, 2021
1 parent 4cd8ed3 commit 608da76
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func getDesiredJobManagerIngress(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobManagerServiceName = getJobManagerServiceName(clusterName)
var jobManagerServiceUIPort = intstr.FromString("ui")
var ingressName = getJobManagerIngressName(clusterName)
Expand Down Expand Up @@ -401,8 +401,8 @@ func getDesiredTaskManagerStatefulSet(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var clusterSpec = flinkCluster.Spec
var imageSpec = flinkCluster.Spec.Image
var serviceAccount = clusterSpec.ServiceAccountName
Expand Down Expand Up @@ -568,8 +568,8 @@ func getDesiredConfigMap(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var flinkProperties = flinkCluster.Spec.FlinkProperties
var jmPorts = flinkCluster.Spec.JobManager.Ports
var tmPorts = flinkCluster.Spec.TaskManager.Ports
Expand Down Expand Up @@ -645,7 +645,7 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
}

// Unless update has been triggered or the job needs to be restarted, keep the job to be stopped in that state.
if !(isUpdateTriggered(flinkCluster.Status) || shouldRestartJob(jobSpec.RestartPolicy, jobStatus)) {
if !isUpdateTriggered(flinkCluster.Status) && !shouldRestartJob(jobSpec.RestartPolicy, jobStatus) {
// Job cancel requested or stopped already
if isJobCancelRequested(*flinkCluster) || isJobStopped(jobStatus) {
return nil
Expand All @@ -656,8 +656,8 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
var imageSpec = clusterSpec.Image
var serviceAccount = clusterSpec.ServiceAccountName
var jobManagerSpec = clusterSpec.JobManager
var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobName = getJobName(clusterName)
var jobManagerServiceName = clusterName + "-jobmanager"
var jobManagerAddress = fmt.Sprintf(
Expand Down
14 changes: 7 additions & 7 deletions controllers/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(
recorded.Components.ConfigMap.DeepCopyInto(&status.Components.ConfigMap)
status.Components.ConfigMap.State = v1beta1.ComponentStateUpdating
} else if observedConfigMap != nil {
status.Components.ConfigMap.Name = observedConfigMap.ObjectMeta.Name
status.Components.ConfigMap.Name = observedConfigMap.Name
status.Components.ConfigMap.State = v1beta1.ComponentStateReady
} else if recorded.Components.ConfigMap.Name != "" {
status.Components.ConfigMap =
Expand All @@ -220,7 +220,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(
recorded.Components.JobManagerStatefulSet.DeepCopyInto(&status.Components.JobManagerStatefulSet)
status.Components.JobManagerStatefulSet.State = v1beta1.ComponentStateUpdating
} else if observedJmStatefulSet != nil {
status.Components.JobManagerStatefulSet.Name = observedJmStatefulSet.ObjectMeta.Name
status.Components.JobManagerStatefulSet.Name = observedJmStatefulSet.Name
status.Components.JobManagerStatefulSet.State = getStatefulSetState(observedJmStatefulSet)
if status.Components.JobManagerStatefulSet.State == v1beta1.ComponentStateReady {
runningComponents++
Expand Down Expand Up @@ -269,7 +269,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(

status.Components.JobManagerService =
v1beta1.JobManagerServiceStatus{
Name: observedJmService.ObjectMeta.Name,
Name: observedJmService.Name,
State: state,
NodePort: nodePort,
LoadBalancerIngress: loadBalancerIngress,
Expand Down Expand Up @@ -352,7 +352,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(

status.Components.JobManagerIngress =
&v1beta1.JobManagerIngressStatus{
Name: observedJmIngress.ObjectMeta.Name,
Name: observedJmIngress.Name,
State: state,
URLs: urls,
}
Expand All @@ -372,7 +372,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(
status.Components.TaskManagerStatefulSet.State = v1beta1.ComponentStateUpdating
} else if observedTmStatefulSet != nil {
status.Components.TaskManagerStatefulSet.Name =
observedTmStatefulSet.ObjectMeta.Name
observedTmStatefulSet.Name
status.Components.TaskManagerStatefulSet.State =
getStatefulSetState(observedTmStatefulSet)
if status.Components.TaskManagerStatefulSet.State ==
Expand All @@ -388,7 +388,7 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(
}

// (Optional) Job.
var jobStatus = updater.getJobStatus()
var jobStatus = updater.deriveJobStatus()
status.Components.Job = jobStatus

// Derive the new cluster state.
Expand Down Expand Up @@ -695,7 +695,7 @@ func (updater *ClusterStatusUpdater) getFlinkJobID() *string {
return nil
}

func (updater *ClusterStatusUpdater) getJobStatus() *v1beta1.JobStatus {
func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
var observed = updater.observed
var observedJob = updater.observed.job
var observedFlinkJob = updater.observed.flinkJobStatus.flinkJob
Expand Down

0 comments on commit 608da76

Please sign in to comment.