-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] add driverPod/executorPod in Spark #6085
base: master
Are you sure you want to change the base?
[FEAT] add driverPod/executorPod in Spark #6085
Conversation
Add driverPod/executorPod field in SparkJob class and use them as Spark driver and executor Signed-off-by: machichima <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6085 +/- ##
==========================================
- Coverage 37.10% 37.01% -0.09%
==========================================
Files 1318 1318
Lines 132331 132578 +247
==========================================
- Hits 49097 49078 -19
- Misses 78961 79249 +288
+ Partials 4273 4251 -22
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
fix protobuf number mismatch pass K8sPod instead of annotation and label separately Signed-off-by: machichima <[email protected]>
successfully apply pods specify in SparkJob Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
ae39e8f
to
394c269
Compare
Signed-off-by: machichima <[email protected]>
394c269
to
da4199b
Compare
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
c3eed97
to
70cfdff
Compare
if k8sPod != nil && k8sPod.GetMetadata() != nil { | ||
if k8sPod.Metadata.Annotations != nil { | ||
annotations = pluginsUtils.UnionMaps(annotations, k8sPod.GetMetadata().GetAnnotations()) | ||
} | ||
if k8sPod.Metadata.Labels != nil { | ||
labels = pluginsUtils.UnionMaps(labels, k8sPod.GetMetadata().GetLabels()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there is a nil check in GetMetadata()
, so we don't need to do it here
if k8sPod != nil && k8sPod.GetMetadata() != nil { | |
if k8sPod.Metadata.Annotations != nil { | |
annotations = pluginsUtils.UnionMaps(annotations, k8sPod.GetMetadata().GetAnnotations()) | |
} | |
if k8sPod.Metadata.Labels != nil { | |
labels = pluginsUtils.UnionMaps(labels, k8sPod.GetMetadata().GetLabels()) | |
} | |
if k8sPod.GetMetadata().GetAnnotations() != nil { | |
annotations = pluginsUtils.UnionMaps(annotations, k8sPod.GetMetadata().GetAnnotations()) | |
} | |
if k8sPod.GetMetadata().GetLabels() != nil { | |
labels = pluginsUtils.UnionMaps(labels, k8sPod.GetMetadata().GetLabels()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
err = utils.UnmarshalStructToObj(executorPod.GetPodSpec(), &customPodSpec) | ||
if err != nil { | ||
return nil, errors.Errorf(errors.BadTaskSpecification, | ||
"Unable to unmarshal pod spec [%v], Err: [%v]", executorPod.GetPodSpec(), err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Unable to unmarshal pod spec [%v], Err: [%v]", executorPod.GetPodSpec(), err.Error()) | |
"Unable to unmarshal executor pod spec [%v], Err: [%v]", executorPod.GetPodSpec(), err.Error()) |
err = utils.UnmarshalStructToObj(driverPod.GetPodSpec(), &customPodSpec) | ||
if err != nil { | ||
return nil, errors.Errorf(errors.BadTaskSpecification, | ||
"Unable to unmarshal pod spec [%v], Err: [%v]", driverPod.GetPodSpec(), err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Unable to unmarshal pod spec [%v], Err: [%v]", driverPod.GetPodSpec(), err.Error()) | |
"Unable to unmarshal driver pod spec [%v], Err: [%v]", driverPod.GetPodSpec(), err.Error()) |
// of c.Name | ||
if val, ok := taskTemplate.GetConfig()[PrimaryContainerKey]; ok { | ||
primaryContainerName = val | ||
c.Name = primaryContainerName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add a small unit test for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether this part is needed. As I know, to set the primary container name, we need to define in pod_template=PodTemplate(primary_container_name="primary")
. But if we set this, we will get into case *core.TaskTemplate_K8SPod
here. Therefore, this if val, ok := taskTemplate.GetConfig()[PrimaryContainerKey]
will never be True.
I am thinking of removing this and use TaskTemplate_K8SPod
in spark_test.go
instead.
Signed-off-by: machichima <[email protected]>
Code Review Agent Run #ebad2bActionable Suggestions - 8
Additional Suggestions - 1
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
if val, ok := taskTemplate.GetConfig()[PrimaryContainerKey]; ok { | ||
primaryContainerName = val | ||
c.Name = primaryContainerName | ||
} else { | ||
primaryContainerName = c.Name | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling empty string value for PrimaryContainerKey
in task template config to avoid potential issues with container naming.
Code suggestion
Check the AI-generated fix before applying
if val, ok := taskTemplate.GetConfig()[PrimaryContainerKey]; ok { | |
primaryContainerName = val | |
c.Name = primaryContainerName | |
} else { | |
primaryContainerName = c.Name | |
} | |
if val, ok := taskTemplate.GetConfig()[PrimaryContainerKey]; ok { | |
if val != "" { | |
primaryContainerName = val | |
c.Name = primaryContainerName | |
} | |
} else { | |
primaryContainerName = c.Name | |
} |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -563,7 +570,7 @@ | |||
} | |||
|
|||
// merge podSpec with podTemplate | |||
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName) | |||
mergedPodSpec, err := MergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if the function name change from mergePodSpecs
to MergePodSpecs
is intentional as it changes the visibility of the function from package-private to public. This could impact API stability and usage patterns. A similar issue was also found in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go (line 2050-2144).
Code suggestion
Check the AI-generated fix before applying
mergedPodSpec, err := MergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName) | |
mergedPodSpec, err := mergePodSpecs(&podTemplate.Template.Spec, podSpec, primaryContainerName, primaryInitContainerName) |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -65,7 +66,7 @@ | |||
} | |||
|
|||
sparkJob := plugins.SparkJob{} | |||
err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &sparkJob) | |||
err = utils.UnmarshalStructToPb(taskTemplate.GetCustom(), &sparkJob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using UnmarshalStruct
instead of UnmarshalStructToPb
since sparkJob
is not a protobuf message but a regular struct.
Code suggestion
Check the AI-generated fix before applying
err = utils.UnmarshalStructToPb(taskTemplate.GetCustom(), &sparkJob) | |
err = utils.UnmarshalStruct(taskTemplate.GetCustom(), &sparkJob) |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
annotations := pluginsUtils.UnionMaps( | ||
config.GetK8sPluginConfig().DefaultAnnotations, | ||
pluginsUtils.CopyMap(taskCtx.TaskExecutionMetadata().GetAnnotations()), | ||
) | ||
labels := pluginsUtils.UnionMaps( | ||
config.GetK8sPluginConfig().DefaultLabels, | ||
pluginsUtils.CopyMap(taskCtx.TaskExecutionMetadata().GetLabels()), | ||
) | ||
if k8sPod.GetMetadata().GetAnnotations() != nil { | ||
annotations = pluginsUtils.UnionMaps(annotations, k8sPod.GetMetadata().GetAnnotations()) | ||
} | ||
if k8sPod.GetMetadata().GetLabels() != nil { | ||
labels = pluginsUtils.UnionMaps(labels, k8sPod.GetMetadata().GetLabels()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the annotation and label merging logic into a separate helper function since it's used in multiple places. This would improve code maintainability and reduce duplication.
Code suggestion
Check the AI-generated fix before applying
annotations := pluginsUtils.UnionMaps( | |
config.GetK8sPluginConfig().DefaultAnnotations, | |
pluginsUtils.CopyMap(taskCtx.TaskExecutionMetadata().GetAnnotations()), | |
) | |
labels := pluginsUtils.UnionMaps( | |
config.GetK8sPluginConfig().DefaultLabels, | |
pluginsUtils.CopyMap(taskCtx.TaskExecutionMetadata().GetLabels()), | |
) | |
if k8sPod.GetMetadata().GetAnnotations() != nil { | |
annotations = pluginsUtils.UnionMaps(annotations, k8sPod.GetMetadata().GetAnnotations()) | |
} | |
if k8sPod.GetMetadata().GetLabels() != nil { | |
labels = pluginsUtils.UnionMaps(labels, k8sPod.GetMetadata().GetLabels()) | |
} | |
annotations, labels := mergeMetadata(config.GetK8sPluginConfig().DefaultAnnotations, config.GetK8sPluginConfig().DefaultLabels, taskCtx.TaskExecutionMetadata(), k8sPod) |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if executorPod != nil { | ||
var customPodSpec *v1.PodSpec | ||
|
||
err = utils.UnmarshalStructToObj(executorPod.GetPodSpec(), &customPodSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider checking if executorPod.GetPodSpec()
is nil before attempting to unmarshal it to avoid potential nil pointer dereference.
Code suggestion
Check the AI-generated fix before applying
@@ -252,6 +252,10 @@
var customPodSpec *v1.PodSpec
+ podSpec := executorPod.GetPodSpec()
+ if podSpec == nil {
+ return nil, errors.Errorf(errors.BadTaskSpecification, "executor pod spec cannot be nil")
+ }
- err = utils.UnmarshalStructToObj(executorPod.GetPodSpec(), &customPodSpec)
+ err = utils.UnmarshalStructToObj(podSpec, &customPodSpec)
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
func dummySparkTaskTemplateDriverExecutor(id string, sparkConf map[string]string, driverPod *core.K8SPod, executorPod *core.K8SPod) *core.TaskTemplate { | ||
sparkJob := dummySparkCustomObjDriverExecutor(sparkConf, driverPod, executorPod) | ||
|
||
structObj, err := utils.MarshalObjToStruct(sparkJob) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
return &core.TaskTemplate{ | ||
Id: &core.Identifier{Name: id}, | ||
Type: "container", | ||
Target: &core.TaskTemplate_Container{ | ||
Container: &core.Container{ | ||
Image: testImage, | ||
}, | ||
}, | ||
Config: map[string]string{ | ||
flytek8s.PrimaryContainerKey: "primary", | ||
}, | ||
Custom: structObj, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the common code between dummySparkTaskTemplateContainer
and dummySparkTaskTemplateDriverExecutor
into a shared helper function to reduce duplication. Both functions appear to create similar task templates with only minor differences.
Code suggestion
Check the AI-generated fix before applying
func dummySparkTaskTemplateDriverExecutor(id string, sparkConf map[string]string, driverPod *core.K8SPod, executorPod *core.K8SPod) *core.TaskTemplate { | |
sparkJob := dummySparkCustomObjDriverExecutor(sparkConf, driverPod, executorPod) | |
structObj, err := utils.MarshalObjToStruct(sparkJob) | |
if err != nil { | |
panic(err) | |
} | |
return &core.TaskTemplate{ | |
Id: &core.Identifier{Name: id}, | |
Type: "container", | |
Target: &core.TaskTemplate_Container{ | |
Container: &core.Container{ | |
Image: testImage, | |
}, | |
}, | |
Config: map[string]string{ | |
flytek8s.PrimaryContainerKey: "primary", | |
}, | |
Custom: structObj, | |
} | |
} | |
func createSparkTaskTemplate(id string, sparkJob interface{}) *core.TaskTemplate { | |
structObj, err := utils.MarshalObjToStruct(sparkJob) | |
if err != nil { | |
panic(err) | |
} | |
return &core.TaskTemplate{ | |
Id: &core.Identifier{Name: id}, | |
Type: "container", | |
Target: &core.TaskTemplate_Container{ | |
Container: &core.Container{ | |
Image: testImage, | |
}, | |
}, | |
Config: map[string]string{ | |
flytek8s.PrimaryContainerKey: "primary", | |
}, | |
Custom: structObj, | |
} | |
} | |
func dummySparkTaskTemplateDriverExecutor(id string, sparkConf map[string]string, driverPod *core.K8SPod, executorPod *core.K8SPod) *core.TaskTemplate { | |
sparkJob := dummySparkCustomObjDriverExecutor(sparkConf, driverPod, executorPod) | |
return createSparkTaskTemplate(id, sparkJob) | |
} |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
assert.Equal(t, len(findEnvVarByName(sparkApp.Spec.Driver.Env, "FLYTE_MAX_ATTEMPTS").Value), 1) | ||
assert.Equal(t, defaultConfig.DefaultEnvVars["foo"], findEnvVarByName(sparkApp.Spec.Driver.Env, "foo").Value) | ||
assert.Equal(t, defaultConfig.DefaultEnvVars["fooEnv"], findEnvVarByName(sparkApp.Spec.Driver.Env, "fooEnv").Value) | ||
assert.Equal(t, findEnvVarByName(dummyEnvVarsWithSecretRef, "SECRET"), findEnvVarByName(sparkApp.Spec.Driver.Env, "SECRET")) | ||
assert.Equal(t, 9, len(sparkApp.Spec.Driver.Env)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the environment variable assertions into a helper function to improve test readability and maintainability. Multiple similar assertions for environment variables could be simplified.
Code suggestion
Check the AI-generated fix before applying
assert.Equal(t, len(findEnvVarByName(sparkApp.Spec.Driver.Env, "FLYTE_MAX_ATTEMPTS").Value), 1) | |
assert.Equal(t, defaultConfig.DefaultEnvVars["foo"], findEnvVarByName(sparkApp.Spec.Driver.Env, "foo").Value) | |
assert.Equal(t, defaultConfig.DefaultEnvVars["fooEnv"], findEnvVarByName(sparkApp.Spec.Driver.Env, "fooEnv").Value) | |
assert.Equal(t, findEnvVarByName(dummyEnvVarsWithSecretRef, "SECRET"), findEnvVarByName(sparkApp.Spec.Driver.Env, "SECRET")) | |
assert.Equal(t, 9, len(sparkApp.Spec.Driver.Env)) | |
assertEnvVars(t, sparkApp.Spec.Driver.Env, defaultConfig.DefaultEnvVars, dummyEnvVarsWithSecretRef) |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
assert.Equal(t, findEnvVarByName(dummyEnvVarsWithSecretRef, "SECRET"), findEnvVarByName(sparkApp.Spec.Executor.Env, "SECRET")) | ||
assert.Equal(t, 9, len(sparkApp.Spec.Executor.Env)) | ||
assert.Equal(t, testImage, *sparkApp.Spec.Executor.Image) | ||
assert.Equal(t, defaultConfig.DefaultPodSecurityContext, sparkApp.Spec.Executor.SecurityContenxt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a typo in the property name SecurityContenxt
. Consider correcting it to SecurityContext
.
Code suggestion
Check the AI-generated fix before applying
assert.Equal(t, defaultConfig.DefaultPodSecurityContext, sparkApp.Spec.Executor.SecurityContenxt) | |
assert.Equal(t, defaultConfig.DefaultPodSecurityContext, sparkApp.Spec.Executor.SecurityContext) |
Code Review Run #ebad2b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
#4105
Why are the changes needed?
Enable setting K8sPod separately for Spark Driver and Executor pods.
What changes were proposed in this pull request?
Add driverPod and executorPod field with type K8sPod in SparkJob. Uses existing
mergePodSpecs
to merge default podSpec with our driverPod or executorPod.How was this patch tested?
Unit tests
I extended the existing Spark unit test
TestBuildResourceContainer
andTestBuildResourcePodTemplate
and create a new test namedTestBuildResourceCustomK8SPod
for testing.Test with my_spark example
Modified the
@task
forhello_spark
function in ``my_spark` example here as follow to set the driver_pod and executor_pod.Verify the pods have Tolerations and EnvVar set.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
flyteorg/flytekit#3016
Docs link
Summary by Bito
This PR enhances Spark task configuration by introducing separate driver and executor pod specifications through new SparkJob message fields. The implementation adds support for customizing Kubernetes pod configurations independently for Spark driver and executor components, including tolerations, environment variables, and other pod settings. The changes encompass protobuf definitions with generated code for multiple languages and improvements to pod helper functions.Unit tests added: True
Estimated effort to review (1-5, lower is better): 4