Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support node selector in the API server #2523

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,15 @@ func FromKubeToAPIComputeTemplate(configMap *corev1.ConfigMap) *api.ComputeTempl
}
}

val, ok = configMap.Data["node_selector"]
if ok {
err := json.Unmarshal([]byte(val), &runtime.NodeSelector)
if err != nil {
klog.Error("failed to unmarshall node selector for compute template ", runtime.Name, " value ",
runtime.ExtendedResources, " error ", err)
}
}

val, ok = configMap.Data["tolerations"]
if ok {
err := json.Unmarshal([]byte(val), &runtime.Tolerations)
Expand Down
35 changes: 35 additions & 0 deletions apiserver/pkg/model/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ var configMapWithTolerations = corev1.ConfigMap{
},
}

var configMapWithNodeSelector = corev1.ConfigMap{
Data: map[string]string{
"cpu": "4",
"gpu": "0",
"gpu_accelerator": "",
"memory": "8",
"extended_resources": "{\"vpc.amazonaws.com/efa\": 32}",
"name": "head-node-template",
"namespace": "max",
"node_selector": "{\"nvidia.com/gpu.product\": \"Tesla-V100-PCIE-16GB\", \"kubernetes.io/hostname\": \"cpu15\"}",
},
}

var workerSpecTest = rayv1api.WorkerGroupSpec{
GroupName: "",
Replicas: &workerReplicas,
Expand Down Expand Up @@ -502,6 +515,11 @@ var expectedTolerations = api.PodToleration{
Effect: "NoExecute",
}

var expectedNodeSelector = map[string]string{
"nvidia.com/gpu.product": "Tesla-V100-PCIE-16GB",
"kubernetes.io/hostname": "cpu15",
}

func TestPopulateHeadNodeSpec(t *testing.T) {
groupSpec := PopulateHeadNodeSpec(headSpecTest)

Expand Down Expand Up @@ -615,8 +633,14 @@ func TestPopulateTemplate(t *testing.T) {
if len(template.Tolerations) != 0 {
t.Errorf("failed to convert config map, expected no tolerations, got %d", len(template.Tolerations))
}
if len(template.NodeSelector) != 0 {
t.Errorf("failed to convert config map, expected no node selector, got %d", len(template.NodeSelector))
}

template = FromKubeToAPIComputeTemplate(&configMapWithTolerations)
if len(template.NodeSelector) != 0 {
t.Errorf("failed to convert config map, expected no node selector, got %d", len(template.NodeSelector))
}
if len(template.Tolerations) != 1 {
t.Errorf("failed to convert config map, expected 1 toleration, got %d", len(template.Tolerations))
}
Expand All @@ -627,6 +651,17 @@ func TestPopulateTemplate(t *testing.T) {
tolerationToString(&expectedTolerations))
}

template = FromKubeToAPIComputeTemplate(&configMapWithNodeSelector)
if len(template.Tolerations) != 0 {
t.Errorf("failed to convert config map, expected no tolerations, got %d", len(template.Tolerations))
}
if len(template.NodeSelector) != 2 {
t.Errorf("failed to convert config map, expected 1 node selector got %d", len(template.NodeSelector))
}
if !reflect.DeepEqual(template.NodeSelector, expectedNodeSelector) {
t.Errorf("failed to convert node selector, got %v, expected %v", template.NodeSelector, expectedNodeSelector)
}

assert.Equal(t, uint32(4), template.Cpu, "CPU mismatch")
assert.Equal(t, uint32(8), template.Memory, "Memory mismatch")
assert.Equal(t, uint32(0), template.Gpu, "GPU mismatch")
Expand Down
38 changes: 30 additions & 8 deletions apiserver/pkg/util/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
Labels: map[string]string{},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{},
Tolerations: []corev1.Toleration{},
NodeSelector: map[string]string{},
Containers: []corev1.Container{
{
Name: "ray-head",
Expand Down Expand Up @@ -297,7 +298,7 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
}
}

// Add specific tollerations
// Add pod tollerations
if computeRuntime.Tolerations != nil {
for _, t := range computeRuntime.Tolerations {
podTemplateSpec.Spec.Tolerations = append(podTemplateSpec.Spec.Tolerations, corev1.Toleration{
Expand All @@ -306,6 +307,13 @@ func buildHeadPodTemplate(imageVersion string, envs *api.EnvironmentVariables, s
}
}

// Add node selector
if computeRuntime.NodeSelector != nil {
for k, v := range computeRuntime.NodeSelector {
podTemplateSpec.Spec.NodeSelector[k] = v
}
}

// If service account is specified, add it to the pod spec.
if len(spec.ServiceAccount) > 1 {
podTemplateSpec.Spec.ServiceAccountName = spec.ServiceAccount
Expand All @@ -329,15 +337,15 @@ func convertEnvironmentVariables(envs *api.EnvironmentVariables) []corev1.EnvVar
if envs == nil {
return converted
}
if envs.Values != nil && len(envs.Values) > 0 {
if len(envs.Values) > 0 {
// Add values
for key, value := range envs.Values {
converted = append(converted, corev1.EnvVar{
Name: key, Value: value,
})
}
}
if envs.ValuesFrom != nil && len(envs.ValuesFrom) > 0 {
if len(envs.ValuesFrom) > 0 {
// Add values ref
for key, value := range envs.ValuesFrom {
switch value.Source {
Expand Down Expand Up @@ -447,7 +455,8 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
Labels: map[string]string{},
},
Spec: corev1.PodSpec{
Tolerations: []corev1.Toleration{},
Tolerations: []corev1.Toleration{},
NodeSelector: map[string]string{},
Containers: []corev1.Container{
{
Name: "ray-worker",
Expand Down Expand Up @@ -591,7 +600,7 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
}
}

// Add specific tollerations
// Add pod tollerations
if computeRuntime.Tolerations != nil {
for _, t := range computeRuntime.Tolerations {
podTemplateSpec.Spec.Tolerations = append(podTemplateSpec.Spec.Tolerations, corev1.Toleration{
Expand All @@ -600,6 +609,13 @@ func buildWorkerPodTemplate(imageVersion string, envs *api.EnvironmentVariables,
}
}

// Add node selector
if computeRuntime.NodeSelector != nil {
for k, v := range computeRuntime.NodeSelector {
podTemplateSpec.Spec.NodeSelector[k] = v
}
}

// If service account is specified, add it to the pod spec.
if len(spec.ServiceAccount) > 1 {
podTemplateSpec.Spec.ServiceAccountName = spec.ServiceAccount
Expand Down Expand Up @@ -847,6 +863,11 @@ func NewComputeTemplate(runtime *api.ComputeTemplate) (*corev1.ConfigMap, error)
return nil, fmt.Errorf("failed to marshal extended resources: %v", err)
}

nodeSelectorJSON, err := json.Marshal(runtime.NodeSelector)
if err != nil {
return nil, fmt.Errorf("failed to marshal extended resources: %v", err)
}

// Create data map
dmap := map[string]string{
"name": runtime.Name,
Expand All @@ -856,9 +877,10 @@ func NewComputeTemplate(runtime *api.ComputeTemplate) (*corev1.ConfigMap, error)
"gpu": strconv.FormatUint(uint64(runtime.Gpu), 10),
"gpu_accelerator": runtime.GpuAccelerator,
"extended_resources": string(extendedResourcesJSON),
"node_selector": string(nodeSelectorJSON),
}
// Add tolerations in defined
if runtime.Tolerations != nil && len(runtime.Tolerations) > 0 {
if len(runtime.Tolerations) > 0 {
t, err := json.Marshal(runtime.Tolerations)
if err != nil {
return nil, fmt.Errorf("failed to marshal tolerations for compute template %s: %w", runtime.Name, err)
Expand Down Expand Up @@ -945,7 +967,7 @@ func buildAutoscalerOptions(autoscalerOptions *api.AutoscalerOptions) (*rayv1api
}
}
}
if autoscalerOptions.Volumes != nil && len(autoscalerOptions.Volumes) > 0 {
if len(autoscalerOptions.Volumes) > 0 {
options.VolumeMounts = buildVolumeMounts(autoscalerOptions.Volumes)
}
if len(autoscalerOptions.Cpu) > 0 || len(autoscalerOptions.Memory) > 0 {
Expand Down
44 changes: 44 additions & 0 deletions apiserver/pkg/util/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package util

import (
"encoding/json"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -253,6 +254,24 @@ var template = api.ComputeTemplate{
},
}

var templateWithNS = api.ComputeTemplate{
Name: "nodeselector",
Namespace: "default",
Cpu: 2,
Memory: 8,
NodeSelector: map[string]string{
"nvidia.com/gpu.product": "Tesla-V100-PCIE-16GB",
"kubernetes.io/hostname": "cpu15",
},
Tolerations: []*api.PodToleration{
{
Key: "blah1",
Operator: "Exists",
Effect: "NoExecute",
},
},
}

var templateWorker = api.ComputeTemplate{
Name: "",
Namespace: "",
Expand Down Expand Up @@ -341,6 +360,22 @@ var expectedSecurityContext = corev1.SecurityContext{
},
}

func TestBuildComputeTemplate(t *testing.T) {
cmap, _ := NewComputeTemplate(&templateWithNS)
selector := cmap.Data["node_selector"]
var jsonMap map[string]interface{}
err := json.Unmarshal([]byte(selector), &jsonMap)
if err != nil {
t.Errorf("failed to unmarshall config map node selector %s, error %v", selector, err)
}
if jsonMap["nvidia.com/gpu.product"].(string) != "Tesla-V100-PCIE-16GB" {
t.Errorf("failed to convert config map, expected node selector Tesla-V100-PCIE-16GB, got %s", jsonMap["nvidia.com/gpu.product"].(string))
}
if jsonMap["kubernetes.io/hostname"].(string) != "cpu15" {
t.Errorf("failed to convert config map, expected node selector Tesla-V100-PCIE-16GB, got %s", jsonMap["nvidia.com/gpu.product"].(string))
}
}

func TestBuildVolumes(t *testing.T) {
targetVolume := corev1.Volume{
Name: testVolume.Name,
Expand Down Expand Up @@ -597,6 +632,15 @@ func TestBuildHeadPodTemplate(t *testing.T) {
if len(podSpec.Spec.Containers[0].Ports) != 6 {
t.Errorf("failed build ports")
}
if len(podSpec.Spec.NodeSelector) != 0 {
t.Errorf("failed build Node selector")
}

podSpec, err = buildHeadPodTemplate("2.4", &api.EnvironmentVariables{}, &headGroup, &templateWithNS, false)
assert.Nil(t, err)
if len(podSpec.Spec.NodeSelector) != 2 {
t.Errorf("failed build Node selector")
}
}

func TestConvertAutoscalerOptions(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class Template:
gpu_accelerator - optional, if not defined nvidia.com/gpu is assumed
extended_resources - optional, name and number of the extended resources
tolerations - optional, tolerations for pod placing, default none
node_selector - optional, node selector for pod placing, default none
- to_string() -> str: convert toleration to string for printing
- to_dict() -> dict[str, Any] convert to dict
- to_json() -> str convert to json string
Expand All @@ -109,6 +110,7 @@ def __init__(
gpu_accelerator: str = None,
extended_resources: dict[str, int] = None,
tolerations: list[Toleration] = None,
node_selector: dict[str, str] = None,
):
"""
Initialization
Expand All @@ -120,6 +122,7 @@ def __init__(
:param gpu_accelerator: accelerator type
:param extended_resources: extended resources
:param tolerations: tolerations
:param node_selector: node selector
"""
self.name = name
self.namespace = namespace
Expand All @@ -129,6 +132,7 @@ def __init__(
self.gpu_accelerator = gpu_accelerator
self.extended_resources = extended_resources
self.tolerations = tolerations
self.node_selector = node_selector

def to_string(self) -> str:
"""
Expand All @@ -142,6 +146,8 @@ def to_string(self) -> str:
val = val + f", gpu accelerator {self.gpu_accelerator}"
if self.extended_resources is not None:
val = val + f", extended resources {self.extended_resources}"
if self.node_selector is not None:
val = val + f", node selector {self.node_selector}"
if self.tolerations is None:
return val
val = val + ", tolerations ["
Expand All @@ -163,9 +169,11 @@ def to_dict(self) -> dict[str, Any]:
if self.gpu > 0:
dct["gpu"] = self.gpu
if self.gpu_accelerator is not None:
dct["gpu accelerator"] = self.gpu_accelerator
dct["gpu_accelerator"] = self.gpu_accelerator
if self.extended_resources is not None:
dct["extended resources"] = self.extended_resources
dct["extended_resources"] = self.extended_resources
if self.node_selector is not None:
dct["node_selector"] = self.node_selector
if self.tolerations is not None:
dct["tolerations"] = [tl.to_dict() for tl in self.tolerations]
return dct
Expand Down Expand Up @@ -206,8 +214,9 @@ def template_decoder(dct: dict[str, Any]) -> Template:
cpu=int(dct.get("cpu", "0")),
memory=int(dct.get("memory", "0")),
gpu=int(dct.get("gpu", "0")),
gpu_accelerator=dct.get("gpu_accelerator"),
extended_resources=dct.get("extended_resources"),
gpu_accelerator=dct.get("gpuAccelerator"),
extended_resources=dct.get("extendedResources"),
node_selector=dct.get("nodeSelector"),
tolerations=tolerations,
)

Expand Down
19 changes: 14 additions & 5 deletions clients/python-apiserver-client/test/api_params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,29 @@ def test_templates():
tm1_json = json.dumps(temp1.to_dict())
print(f"template 1 JSON: {tm1_json}")

temp2 = Template(name="template2", namespace="namespace", cpu=2, memory=8, gpu=1)
temp2 = Template(name="template2", namespace="namespace", cpu=2, memory=8, gpu=1, gpu_accelerator="nvidia")
print(f"template 2: {temp2.to_string()}")
tm2_json = json.dumps(temp2.to_dict())
print(f"template 2 JSON: {tm2_json}")

temp3 = Template(name="template3", namespace="namespace", cpu=2, memory=8, gpu=1, extended_resources={"vpc.amazonaws.com/efa": 32})
temp3 = Template(name="template3", namespace="namespace", cpu=2, memory=8, gpu=1,
extended_resources={"vpc.amazonaws.com/efa": 32})
print(f"template 3: {temp3.to_string()}")
tm3_json = json.dumps(temp3.to_dict())
print(f"template 3 JSON: {tm3_json}")

assert temp1.to_string() == template_decoder(json.loads(tm1_json)).to_string()
assert temp2.to_string() == template_decoder(json.loads(tm2_json)).to_string()
assert temp3.to_string() == template_decoder(json.loads(tm3_json)).to_string()
temp4 = Template(name="template3", namespace="namespace", cpu=2, memory=8, gpu=1,
node_selector={"nvidia.com/gpu.product": "NVIDIA-A100-80GB-PCIe",
"kubernetes.io/hostname": "cpu15"})
print(f"template 4: {temp4.to_string()}")
tm4_json = json.dumps(temp4.to_dict())
print(f"template 4 JSON: {tm4_json}")

assert temp1.to_string() == template_decoder(json.loads(tm1_json)).to_string()
# These are commented out as the real cluster replaces params with _ to a CamelCase
# assert temp2.to_string() == template_decoder(json.loads(tm2_json)).to_string()
# assert temp3.to_string() == template_decoder(json.loads(tm3_json)).to_string()
# assert temp4.to_string() == template_decoder(json.loads(tm4_json)).to_string()

def test_volumes():

Expand Down
4 changes: 3 additions & 1 deletion clients/python-apiserver-client/test/kuberay_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def test_templates():
_, _ = apis.delete_compute_template(ns="default", name="default-template")
# create
toleration = Toleration(key="blah1", operator=TolerationOperation.Exists, effect=TolerationEffect.NoExecute)
template = Template(name="default-template", namespace="default", cpu=2, memory=8, gpu=1, extended_resources={"vpc.amazonaws.com/efa": 32}, tolerations=[toleration])
template = Template(name="default-template", namespace="default", cpu=2, memory=8, gpu=1,
gpu_accelerator="nvidia.com/gpu", extended_resources={"vpc.amazonaws.com/efa": 32},
tolerations=[toleration], node_selector={"nvidia.com/gpu.product": "NVIDIA-A100-80GB-PCIe"})
status, error = apis.create_compute_template(template)
assert status == 200
assert error is None
Expand Down
1 change: 1 addition & 0 deletions proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message ComputeTemplate {
repeated PodToleration tolerations = 7;
// Optional. Name and number of the extended resources
map<string, uint32> extended_resources = 8;
map<string, string> node_selector = 9;
}

// This service is not implemented.
Expand Down
2 changes: 1 addition & 1 deletion proto/go_client/cluster.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading