Skip to content

Commit

Permalink
Rework volcano PodGroup MinResources to reflect limits and requests (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Mar 28, 2022
1 parent 9d7cccd commit 0296594
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
69 changes: 46 additions & 23 deletions internal/batchscheduler/volcano/volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
Expand Down Expand Up @@ -144,7 +145,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup(
return nil, nil
}

minResource, size := getClusterResource(state)
resourceRequirements, size := getClusterResource(state)
pg, err := v.getPodGroup(podGroupName, namespace)
if err != nil {
if !errors.IsNotFound(err) {
Expand All @@ -158,7 +159,7 @@ func (v *VolcanoBatchScheduler) syncPodGroup(
},
Spec: scheduling.PodGroupSpec{
MinMember: size,
MinResources: &minResource,
MinResources: buildMinResource(resourceRequirements),
Queue: options.Queue,
PriorityClassName: options.PriorityClassName,
},
Expand All @@ -174,59 +175,81 @@ func (v *VolcanoBatchScheduler) syncPodGroup(
return pg, nil
}

func getClusterResource(state *model.DesiredClusterState) (corev1.ResourceList, int32) {
resource := corev1.ResourceList{}
func getClusterResource(state *model.DesiredClusterState) (*corev1.ResourceRequirements, int32) {
reqs := &corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{},
Requests: map[corev1.ResourceName]resource.Quantity{},
}
var size int32

if state.JmStatefulSet != nil {
size += *state.JmStatefulSet.Spec.Replicas
for i := int32(0); i < *state.JmStatefulSet.Spec.Replicas; i++ {
jmResource := getPodResource(&state.JmStatefulSet.Spec.Template.Spec)
addResourceList(resource, jmResource, nil)
addResourceRequirements(reqs, jmResource)
}
}

if state.TmStatefulSet != nil {
size += *state.TmStatefulSet.Spec.Replicas
for i := int32(0); i < *state.TmStatefulSet.Spec.Replicas; i++ {
tmResource := getPodResource(&state.TmStatefulSet.Spec.Template.Spec)
addResourceList(resource, tmResource, nil)
addResourceRequirements(reqs, tmResource)
}
}

if state.Job != nil {
size += 1
jobResource := getPodResource(&state.Job.Spec.Template.Spec)
addResourceList(resource, jobResource, nil)
addResourceRequirements(reqs, jobResource)
}

return resource, size
return reqs, size
}

func getPodResource(spec *corev1.PodSpec) corev1.ResourceList {
resource := corev1.ResourceList{}
func getPodResource(spec *corev1.PodSpec) *corev1.ResourceRequirements {
reqs := &corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{},
Requests: map[corev1.ResourceName]resource.Quantity{},
}
for _, container := range spec.Containers {
addResourceList(resource, container.Resources.Requests, container.Resources.Limits)
addResourceRequirements(reqs, &container.Resources)
}
return resource
return reqs
}

func addResourceList(list, req, limit corev1.ResourceList) {
for name, quantity := range req {

if value, ok := list[name]; !ok {
list[name] = quantity.DeepCopy()
func addResourceRequirements(acc, req *corev1.ResourceRequirements) {
for name, quantity := range req.Requests {
if value, ok := acc.Requests[name]; !ok {
acc.Requests[name] = quantity.DeepCopy()
} else {
value.Add(quantity)
list[name] = value
acc.Requests[name] = value
}
}

// If Requests is omitted for a container,
// it defaults to Limits if that is explicitly specified.
for name, quantity := range limit {
if _, ok := list[name]; !ok {
list[name] = quantity.DeepCopy()
for name, quantity := range req.Limits {
if value, ok := acc.Limits[name]; !ok {
acc.Limits[name] = quantity.DeepCopy()
} else {
value.Add(quantity)
acc.Limits[name] = value
}
}
}

func buildMinResource(req *corev1.ResourceRequirements) *corev1.ResourceList {
minResource := corev1.ResourceList{}
for name, quantity := range req.Requests {
minResource[name] = quantity.DeepCopy()
n := corev1.ResourceName(fmt.Sprintf("requests.%s", name))
minResource[n] = quantity.DeepCopy()
}

for name, quantity := range req.Limits {
n := corev1.ResourceName(fmt.Sprintf("limits.%s", name))
minResource[n] = quantity.DeepCopy()
}

return &minResource
}
4 changes: 2 additions & 2 deletions internal/batchscheduler/volcano/volcano_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,6 @@ func TestGetClusterResource(t *testing.T) {

res, size := getClusterResource(desiredState)
assert.Assert(t, size == 5)
assert.Assert(t, res.Memory().String() == "2304Mi")
assert.Assert(t, res.Cpu().MilliValue() == 900)
assert.Assert(t, res.Requests.Memory().String() == "2304Mi")
assert.Assert(t, res.Requests.Cpu().MilliValue() == 900)
}

0 comments on commit 0296594

Please sign in to comment.