From 7605234c409bc31edca63e855c25828c0411c0df Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Wed, 23 Jun 2021 10:40:30 +0100 Subject: [PATCH] Add Job mode property (#52) --- api/v1beta1/flinkcluster_default.go | 4 ++++ api/v1beta1/flinkcluster_default_test.go | 4 ++++ api/v1beta1/flinkcluster_types.go | 11 +++++++++++ api/v1beta1/flinkcluster_validate.go | 17 +++++++++++++++++ api/v1beta1/flinkcluster_validate_test.go | 4 ++++ api/v1beta1/zz_generated.deepcopy.go | 5 +++++ .../flinkoperator.k8s.io_flinkclusters.yaml | 3 ++- controllers/flinkcluster_converter.go | 17 ++++++++++++----- controllers/flinkcluster_converter_test.go | 2 ++ 9 files changed, 61 insertions(+), 6 deletions(-) diff --git a/api/v1beta1/flinkcluster_default.go b/api/v1beta1/flinkcluster_default.go index 380120ef..2f8471e6 100644 --- a/api/v1beta1/flinkcluster_default.go +++ b/api/v1beta1/flinkcluster_default.go @@ -145,6 +145,10 @@ func _SetJobDefault(jobSpec *JobSpec) { AfterJobCancelled: CleanupActionDeleteCluster, } } + if jobSpec.Mode == nil { + jobSpec.Mode = new(JobMode) + *jobSpec.Mode = JobModeDetached + } } func _SetHadoopConfigDefault(hadoopConfig *HadoopConfig) { diff --git a/api/v1beta1/flinkcluster_default_test.go b/api/v1beta1/flinkcluster_default_test.go index cef7a71a..33c5b476 100644 --- a/api/v1beta1/flinkcluster_default_test.go +++ b/api/v1beta1/flinkcluster_default_test.go @@ -40,6 +40,7 @@ func TestSetDefault(t *testing.T) { } _SetDefault(&cluster) + var defaultJobMode JobMode = JobModeDetached var defaultJmReplicas = int32(1) var defaultJmRPCPort = int32(6123) var defaultJmBlobPort = int32(6124) @@ -107,6 +108,7 @@ func TestSetDefault(t *testing.T) { AfterJobCancelled: "DeleteCluster", }, SecurityContext: nil, + Mode: &defaultJobMode, }, FlinkProperties: nil, HadoopConfig: &HadoopConfig{ @@ -127,6 +129,7 @@ func TestSetDefault(t *testing.T) { // Tests non-default values are not overwritten unexpectedly. func TestSetNonDefault(t *testing.T) { + var defaultJobMode = JobMode(JobModeDetached) var jmReplicas = int32(2) var jmRPCPort = int32(8123) var jmBlobPort = int32(8124) @@ -260,6 +263,7 @@ func TestSetNonDefault(t *testing.T) { AfterJobFails: "DeleteCluster", AfterJobCancelled: "KeepCluster", }, + Mode: &defaultJobMode, }, FlinkProperties: nil, HadoopConfig: &HadoopConfig{ diff --git a/api/v1beta1/flinkcluster_types.go b/api/v1beta1/flinkcluster_types.go index 0283994e..ec61f642 100644 --- a/api/v1beta1/flinkcluster_types.go +++ b/api/v1beta1/flinkcluster_types.go @@ -41,6 +41,14 @@ const ( ComponentStateDeleted = "Deleted" ) +// JobMode defines the running mode for the job. +const ( + JobModeBlocking = "Blocking" + JobModeDetached = "Detached" +) + +type JobMode string + // JobState defines states for a Flink job deployment. const ( JobStatePending = "Pending" @@ -418,6 +426,9 @@ type JobSpec struct { Resources corev1.ResourceRequirements `json:"resources,omitempty"` SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"` + + // Job running mode + Mode *JobMode `json:"mode,omitempty"` } // FlinkClusterSpec defines the desired state of FlinkCluster diff --git a/api/v1beta1/flinkcluster_validate.go b/api/v1beta1/flinkcluster_validate.go index 6c95904e..f2db19cf 100644 --- a/api/v1beta1/flinkcluster_validate.go +++ b/api/v1beta1/flinkcluster_validate.go @@ -477,6 +477,13 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error { "property `cancelRequested` cannot be set to true for a new job") } + if jobSpec.Mode == nil { + return fmt.Errorf("job mode is unspecified") + } + if err := v.validateJobMode("mode", *jobSpec.Mode); err != nil { + return err + } + return nil } @@ -528,6 +535,16 @@ func (v *Validator) validateCleanupAction( return nil } +func (v *Validator) validateJobMode(property string, value JobMode) error { + switch value { + case JobModeBlocking: + case JobModeDetached: + default: + return fmt.Errorf("invalid %v: %v", property, value) + } + return nil +} + func (v *Validator) validateRatio(ratio *int32, component, property string) error { if ratio == nil || *ratio > 100 || *ratio < 0 { return fmt.Errorf("invalid %v %v, it must be between 0 and 100", component, property) diff --git a/api/v1beta1/flinkcluster_validate_test.go b/api/v1beta1/flinkcluster_validate_test.go index d4840317..52a20c76 100644 --- a/api/v1beta1/flinkcluster_validate_test.go +++ b/api/v1beta1/flinkcluster_validate_test.go @@ -38,6 +38,7 @@ func TestValidateCreate(t *testing.T) { var parallelism int32 = 2 var restartPolicy = JobRestartPolicyFromSavepointOnFailure var memoryProcessRatio int32 = 25 + var jobMode JobMode = JobModeDetached var cluster = FlinkCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -77,6 +78,7 @@ func TestValidateCreate(t *testing.T) { AfterJobSucceeds: CleanupActionKeepCluster, AfterJobFails: CleanupActionDeleteTaskManager, }, + Mode: &jobMode, }, GCPConfig: &GCPConfig{ ServiceAccount: &GCPServiceAccount{ @@ -932,6 +934,7 @@ func getSimpleFlinkCluster() FlinkCluster { var parallelism int32 = 2 var restartPolicy = JobRestartPolicyFromSavepointOnFailure var savepointDir = "/savepoint_dir" + var jobMode JobMode = JobModeDetached return FlinkCluster{ ObjectMeta: metav1.ObjectMeta{ Name: "mycluster", @@ -974,6 +977,7 @@ func getSimpleFlinkCluster() FlinkCluster { AfterJobSucceeds: CleanupActionKeepCluster, AfterJobFails: CleanupActionDeleteTaskManager, }, + Mode: &jobMode, }, }, } diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index f6b53eaa..1ab93206 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -676,6 +676,11 @@ func (in *JobSpec) DeepCopyInto(out *JobSpec) { *out = new(v1.PodSecurityContext) (*in).DeepCopyInto(*out) } + if in.Mode != nil { + in, out := &in.Mode, &out.Mode + *out = new(JobMode) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSpec. diff --git a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml index ce52805c..9823814c 100644 --- a/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml +++ b/config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml @@ -4,7 +4,6 @@ metadata: annotations: api-approved.kubernetes.io: unapproved controller-gen.kubebuilder.io/version: v0.5.1-0.20210408091555-18885b17ff7b - creationTimestamp: null name: flinkclusters.flinkoperator.k8s.io spec: group: flinkoperator.k8s.io @@ -725,6 +724,8 @@ spec: type: array jarFile: type: string + mode: + type: string noLoggingToStdout: type: boolean parallelism: diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 5f9daf7e..6d2b31f4 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var clusterSpec = flinkCluster.Spec var imageSpec = clusterSpec.Image var serviceAccount = clusterSpec.ServiceAccountName @@ -255,8 +255,8 @@ func getDesiredJobManagerService( return nil } - var clusterNamespace = flinkCluster.ObjectMeta.Namespace - var clusterName = flinkCluster.ObjectMeta.Name + var clusterNamespace = flinkCluster.Namespace + var clusterName = flinkCluster.Name var jobManagerSpec = flinkCluster.Spec.JobManager var rpcPort = corev1.ServicePort{ Name: "rpc", @@ -689,7 +689,14 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { *jobSpec.NoLoggingToStdout { jobArgs = append(jobArgs, "--sysoutLogging") } - jobArgs = append(jobArgs, "--detached") + + if jobSpec.Mode != nil { + switch *jobSpec.Mode { + case v1beta1.JobModeBlocking: + case v1beta1.JobModeDetached: + jobArgs = append(jobArgs, "--detached") + } + } var securityContext = jobSpec.SecurityContext diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index a6953602..e265e52c 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -55,6 +55,7 @@ func TestGetDesiredClusterState(t *testing.T) { var memoryOffHeapRatio int32 = 25 var memoryOffHeapMin = resource.MustParse("600M") var memoryProcessRatio int32 = 80 + var jobMode v1beta1.JobMode = v1beta1.JobModeDetached var jobBackoffLimit int32 = 0 var jmReadinessProbe = corev1.Probe{ Handler: corev1.Handler{ @@ -152,6 +153,7 @@ func TestGetDesiredClusterState(t *testing.T) { corev1.ResourceMemory: resource.MustParse("512Mi"), }, }, + Mode: &jobMode, RestartPolicy: &restartPolicy, Volumes: []corev1.Volume{ {