Skip to content

Commit

Permalink
Add Job mode property (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jun 23, 2021
1 parent b6a57a5 commit 7605234
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 6 deletions.
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func _SetJobDefault(jobSpec *JobSpec) {
AfterJobCancelled: CleanupActionDeleteCluster,
}
}
if jobSpec.Mode == nil {
jobSpec.Mode = new(JobMode)
*jobSpec.Mode = JobModeDetached
}
}

func _SetHadoopConfigDefault(hadoopConfig *HadoopConfig) {
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestSetDefault(t *testing.T) {
}
_SetDefault(&cluster)

var defaultJobMode JobMode = JobModeDetached
var defaultJmReplicas = int32(1)
var defaultJmRPCPort = int32(6123)
var defaultJmBlobPort = int32(6124)
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestSetDefault(t *testing.T) {
AfterJobCancelled: "DeleteCluster",
},
SecurityContext: nil,
Mode: &defaultJobMode,
},
FlinkProperties: nil,
HadoopConfig: &HadoopConfig{
Expand All @@ -127,6 +129,7 @@ func TestSetDefault(t *testing.T) {

// Tests non-default values are not overwritten unexpectedly.
func TestSetNonDefault(t *testing.T) {
var defaultJobMode = JobMode(JobModeDetached)
var jmReplicas = int32(2)
var jmRPCPort = int32(8123)
var jmBlobPort = int32(8124)
Expand Down Expand Up @@ -260,6 +263,7 @@ func TestSetNonDefault(t *testing.T) {
AfterJobFails: "DeleteCluster",
AfterJobCancelled: "KeepCluster",
},
Mode: &defaultJobMode,
},
FlinkProperties: nil,
HadoopConfig: &HadoopConfig{
Expand Down
11 changes: 11 additions & 0 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ const (
ComponentStateDeleted = "Deleted"
)

// JobMode defines the running mode for the job.
const (
JobModeBlocking = "Blocking"
JobModeDetached = "Detached"
)

type JobMode string

// JobState defines states for a Flink job deployment.
const (
JobStatePending = "Pending"
Expand Down Expand Up @@ -418,6 +426,9 @@ type JobSpec struct {
Resources corev1.ResourceRequirements `json:"resources,omitempty"`

SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`

// Job running mode
Mode *JobMode `json:"mode,omitempty"`
}

// FlinkClusterSpec defines the desired state of FlinkCluster
Expand Down
17 changes: 17 additions & 0 deletions api/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error {
"property `cancelRequested` cannot be set to true for a new job")
}

if jobSpec.Mode == nil {
return fmt.Errorf("job mode is unspecified")
}
if err := v.validateJobMode("mode", *jobSpec.Mode); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -528,6 +535,16 @@ func (v *Validator) validateCleanupAction(
return nil
}

func (v *Validator) validateJobMode(property string, value JobMode) error {
switch value {
case JobModeBlocking:
case JobModeDetached:
default:
return fmt.Errorf("invalid %v: %v", property, value)
}
return nil
}

func (v *Validator) validateRatio(ratio *int32, component, property string) error {
if ratio == nil || *ratio > 100 || *ratio < 0 {
return fmt.Errorf("invalid %v %v, it must be between 0 and 100", component, property)
Expand Down
4 changes: 4 additions & 0 deletions api/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestValidateCreate(t *testing.T) {
var parallelism int32 = 2
var restartPolicy = JobRestartPolicyFromSavepointOnFailure
var memoryProcessRatio int32 = 25
var jobMode JobMode = JobModeDetached
var cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -77,6 +78,7 @@ func TestValidateCreate(t *testing.T) {
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
},
Mode: &jobMode,
},
GCPConfig: &GCPConfig{
ServiceAccount: &GCPServiceAccount{
Expand Down Expand Up @@ -932,6 +934,7 @@ func getSimpleFlinkCluster() FlinkCluster {
var parallelism int32 = 2
var restartPolicy = JobRestartPolicyFromSavepointOnFailure
var savepointDir = "/savepoint_dir"
var jobMode JobMode = JobModeDetached
return FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -974,6 +977,7 @@ func getSimpleFlinkCluster() FlinkCluster {
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
},
Mode: &jobMode,
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
annotations:
api-approved.kubernetes.io: unapproved
controller-gen.kubebuilder.io/version: v0.5.1-0.20210408091555-18885b17ff7b
creationTimestamp: null
name: flinkclusters.flinkoperator.k8s.io
spec:
group: flinkoperator.k8s.io
Expand Down Expand Up @@ -725,6 +724,8 @@ spec:
type: array
jarFile:
type: string
mode:
type: string
noLoggingToStdout:
type: boolean
parallelism:
Expand Down
17 changes: 12 additions & 5 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func getDesiredJobManagerStatefulSet(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var clusterSpec = flinkCluster.Spec
var imageSpec = clusterSpec.Image
var serviceAccount = clusterSpec.ServiceAccountName
Expand Down Expand Up @@ -255,8 +255,8 @@ func getDesiredJobManagerService(
return nil
}

var clusterNamespace = flinkCluster.ObjectMeta.Namespace
var clusterName = flinkCluster.ObjectMeta.Name
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobManagerSpec = flinkCluster.Spec.JobManager
var rpcPort = corev1.ServicePort{
Name: "rpc",
Expand Down Expand Up @@ -689,7 +689,14 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
*jobSpec.NoLoggingToStdout {
jobArgs = append(jobArgs, "--sysoutLogging")
}
jobArgs = append(jobArgs, "--detached")

if jobSpec.Mode != nil {
switch *jobSpec.Mode {
case v1beta1.JobModeBlocking:
case v1beta1.JobModeDetached:
jobArgs = append(jobArgs, "--detached")
}
}

var securityContext = jobSpec.SecurityContext

Expand Down
2 changes: 2 additions & 0 deletions controllers/flinkcluster_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestGetDesiredClusterState(t *testing.T) {
var memoryOffHeapRatio int32 = 25
var memoryOffHeapMin = resource.MustParse("600M")
var memoryProcessRatio int32 = 80
var jobMode v1beta1.JobMode = v1beta1.JobModeDetached
var jobBackoffLimit int32 = 0
var jmReadinessProbe = corev1.Probe{
Handler: corev1.Handler{
Expand Down Expand Up @@ -152,6 +153,7 @@ func TestGetDesiredClusterState(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("512Mi"),
},
},
Mode: &jobMode,
RestartPolicy: &restartPolicy,
Volumes: []corev1.Volume{
{
Expand Down

0 comments on commit 7605234

Please sign in to comment.