Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Backend + SDK): Update kfp backend and kubernetes sdk to allow enabling shared memory #10704

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,36 @@ func extendPodSpecPatch(
if len(podSpec.Containers) == 0 {
return fmt.Errorf("failed to patch the pod with kubernetes-specific config due to missing user container: %v", podSpec)
}

// Handle Shared Memory Volume if configured
if shmConfig := kubernetesExecutorConfig.GetEnabledSharedMemory(); shmConfig != nil {
// Create a volume with the name specified in the configuration
shmVolume := k8score.Volume{
Name: shmConfig.VolumeName, // Use the name from the configuration message
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMediumMemory,
},
},
}

// Set the size if specified in the configuration message
if shmConfig.Size != "" {
sizeQuantity := k8sres.MustParse(shmConfig.Size) // Parse the string to a Quantity object
shmVolume.VolumeSource.EmptyDir.SizeLimit = &sizeQuantity // Assign the address of sizeQuantity
}

// Create a volume mount that mounts the shared memory volume to /dev/shm
shmVolumeMount := k8score.VolumeMount{
Name: shmVolume.Name, // Use the same name as the volume
MountPath: "/dev/shm",
}
// Append the created volume to the pod's volumes
podSpec.Volumes = append(podSpec.Volumes, shmVolume)
// Assume the user container is always the first in the list and add the volume mount
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, shmVolumeMount)
}

// Get volume mount information
if kubernetesExecutorConfig.GetPvcMount() != nil {
volumeMounts, volumes, err := makeVolumeMountPatch(kubernetesExecutorConfig.GetPvcMount(), dag, dagTasks)
Expand Down
101 changes: 101 additions & 0 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,3 +1621,104 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_SharedMemory(t *testing.T) {
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - Shared Memory Enabled",
&kubernetesplatform.KubernetesExecutorConfig{
EnabledSharedMemory: &kubernetesplatform.EnabledSharedMemory{
VolumeName: "shm-vol",
Size: "2Gi",
},
},
&k8score.PodSpec{
Volumes: []k8score.Volume{
{
Name: "shm-vol",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMediumMemory,
SizeLimit: func() *k8sres.Quantity {
q := k8sres.MustParse("2Gi")
return &q
}(),
},
},
},
},
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "shm-vol",
MountPath: "/dev/shm",
},
},
},
},
},
},
{
"Valid - Shared Memory Enabled without size",
&kubernetesplatform.KubernetesExecutorConfig{
EnabledSharedMemory: &kubernetesplatform.EnabledSharedMemory{
VolumeName: "shm-vol-no-size",
Size: "",
},
},
&k8score.PodSpec{
Volumes: []k8score.Volume{
{
Name: "shm-vol-no-size",
VolumeSource: k8score.VolumeSource{
EmptyDir: &k8score.EmptyDirVolumeSource{
Medium: k8score.StorageMediumMemory,
},
},
},
},
Containers: []k8score.Container{
{
Name: "main",
VolumeMounts: []k8score.VolumeMount{
{
Name: "shm-vol-no-size",
MountPath: "/dev/shm",
},
},
},
},
},
},
{
"Invalid - Shared Memory not enabled",
&kubernetesplatform.KubernetesExecutorConfig{},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.Equal(t, tt.expected, got)
})
}
}
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
'use_config_map_as_volume',
'use_secret_as_env',
'use_secret_as_volume',
'enable_shared_memory',
]

from kfp.kubernetes.config_map import use_config_map_as_env
Expand All @@ -51,3 +52,4 @@
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.shared_memory import enable_shared_memory
49 changes: 49 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def enable_shared_memory(
task: PipelineTask, volume_name: str = "shm", size: str = ""
) -> PipelineTask:
"""Add shared memory configuration to the task's Kubernetes configuration.

This function adds a shared memory volume with optional name and size
parameters to the task's Kubernetes Executor config. Size should be
specified in standard Kubernetes format (e.g., '1Gi', '500Mi').

Args:
task: Pipeline task.
volume_name: Name of the shared memory volume, defaults to 'shm'.
size: Size of the shared memory, defaults to an empty string, ''.

Returns:
Task object with updated shared memory configuration.
"""

# get existing k8s config
msg = common.get_existing_kubernetes_config_as_message(task)

# set new values
msg.enabled_shared_memory.size = size if size is not None else ""
msg.enabled_shared_memory.volume_name = volume_name

# update task specific k8s config
task.platform_config["kubernetes"] = json_format.MessageToDict(msg)

return task
33 changes: 33 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/shared_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.enable_shared_memory(task, volume_name="Random-Name", size="2Gi")


if __name__ == "__main__":
from kfp import compiler

compiler.Compiler().compile(my_pipeline, __file__.replace(".py", ".yaml"))
54 changes: 54 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/shared_memory.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
enabledSharedMemory:
size: 2Gi
volumeName: Random-Name
Loading
Loading