diff --git a/internal/batchscheduler/volcano/volcano.go b/internal/batchscheduler/volcano/volcano.go index 8aad9a92..90902fa5 100644 --- a/internal/batchscheduler/volcano/volcano.go +++ b/internal/batchscheduler/volcano/volcano.go @@ -22,6 +22,7 @@ import ( "golang.org/x/net/context" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -144,7 +145,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup( return nil, nil } - minResource, size := getClusterResource(state) + resourceRequirements, size := getClusterResource(state) pg, err := v.getPodGroup(podGroupName, namespace) if err != nil { if !errors.IsNotFound(err) { @@ -158,7 +159,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup( }, Spec: scheduling.PodGroupSpec{ MinMember: size, - MinResources: &minResource, + MinResources: buildMinResource(resourceRequirements), Queue: options.Queue, PriorityClassName: options.PriorityClassName, }, @@ -174,15 +175,18 @@ func (v *VolcanoBatchScheduler) syncPodGroup( return pg, nil } -func getClusterResource(state *model.DesiredClusterState) (corev1.ResourceList, int32) { - resource := corev1.ResourceList{} +func getClusterResource(state *model.DesiredClusterState) (*corev1.ResourceRequirements, int32) { + reqs := &corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{}, + Requests: map[corev1.ResourceName]resource.Quantity{}, + } var size int32 if state.JmStatefulSet != nil { size += *state.JmStatefulSet.Spec.Replicas for i := int32(0); i < *state.JmStatefulSet.Spec.Replicas; i++ { jmResource := getPodResource(&state.JmStatefulSet.Spec.Template.Spec) - addResourceList(resource, jmResource, nil) + addResourceRequirements(reqs, jmResource) } } @@ -190,43 +194,62 @@ func getClusterResource(state *model.DesiredClusterState) (corev1.ResourceList, size += *state.TmStatefulSet.Spec.Replicas for i := int32(0); i < *state.TmStatefulSet.Spec.Replicas; i++ { tmResource := getPodResource(&state.TmStatefulSet.Spec.Template.Spec) - addResourceList(resource, tmResource, nil) + addResourceRequirements(reqs, tmResource) } } if state.Job != nil { size += 1 jobResource := getPodResource(&state.Job.Spec.Template.Spec) - addResourceList(resource, jobResource, nil) + addResourceRequirements(reqs, jobResource) } - return resource, size + return reqs, size } -func getPodResource(spec *corev1.PodSpec) corev1.ResourceList { - resource := corev1.ResourceList{} +func getPodResource(spec *corev1.PodSpec) *corev1.ResourceRequirements { + reqs := &corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{}, + Requests: map[corev1.ResourceName]resource.Quantity{}, + } for _, container := range spec.Containers { - addResourceList(resource, container.Resources.Requests, container.Resources.Limits) + addResourceRequirements(reqs, &container.Resources) } - return resource + return reqs } -func addResourceList(list, req, limit corev1.ResourceList) { - for name, quantity := range req { - - if value, ok := list[name]; !ok { - list[name] = quantity.DeepCopy() +func addResourceRequirements(acc, req *corev1.ResourceRequirements) { + for name, quantity := range req.Requests { + if value, ok := acc.Requests[name]; !ok { + acc.Requests[name] = quantity.DeepCopy() } else { value.Add(quantity) - list[name] = value + acc.Requests[name] = value } } - // If Requests is omitted for a container, - // it defaults to Limits if that is explicitly specified. - for name, quantity := range limit { - if _, ok := list[name]; !ok { - list[name] = quantity.DeepCopy() + for name, quantity := range req.Limits { + if value, ok := acc.Limits[name]; !ok { + acc.Limits[name] = quantity.DeepCopy() + } else { + value.Add(quantity) + acc.Limits[name] = value } } } + +func buildMinResource(req *corev1.ResourceRequirements) *corev1.ResourceList { + minResource := corev1.ResourceList{} + for name, quantity := range req.Requests { + minResource[name] = quantity.DeepCopy() + n := corev1.ResourceName(fmt.Sprintf("requests.%s", name)) + minResource[n] = quantity.DeepCopy() + } + + for name, quantity := range req.Limits { + n := corev1.ResourceName(fmt.Sprintf("limits.%s", name)) + minResource[n] = quantity.DeepCopy() + } + + return &minResource +} diff --git a/internal/batchscheduler/volcano/volcano_test.go b/internal/batchscheduler/volcano/volcano_test.go index 08df148d..199f2e88 100644 --- a/internal/batchscheduler/volcano/volcano_test.go +++ b/internal/batchscheduler/volcano/volcano_test.go @@ -287,6 +287,6 @@ func TestGetClusterResource(t *testing.T) { res, size := getClusterResource(desiredState) assert.Assert(t, size == 5) - assert.Assert(t, res.Memory().String() == "2304Mi") - assert.Assert(t, res.Cpu().MilliValue() == 900) + assert.Assert(t, res.Requests.Memory().String() == "2304Mi") + assert.Assert(t, res.Requests.Cpu().MilliValue() == 900) }