Skip to content

Commit

Permalink
Merge pull request kubernetes#42598 from kubernetes/revert-41870-test…
Browse files Browse the repository at this point in the history
…-out-of-oir

Automatic merge from submit-queue (batch tested with PRs 42080, 41653, 42598, 42555)

Revert "Pods pending due to insufficient OIR should get scheduled once sufficient OIR becomes available."

Reverts kubernetes#41870 for stopping bleeding edge: kubernetes#42597

cc/ @ConnorDoyle @kubernetes/release-team 

Connor if there is a pending pr to fix the issue, please point it out to me. We can close this one, otherwise, I would like to revert the pr first. You can resubmit the fix. Thanks!
  • Loading branch information
Kubernetes Submit Queue authored Mar 7, 2017
2 parents 73c5d6c + 60758f3 commit 8e52bec
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 132 deletions.
8 changes: 0 additions & 8 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,6 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
if node.Status.Allocatable == nil {
node.Status.Allocatable = make(v1.ResourceList)
}
// Remove opaque integer resources from allocatable that are no longer
// present in capacity.
for k := range node.Status.Allocatable {
_, found := node.Status.Capacity[k]
if !found && v1.IsOpaqueIntResourceName(k) {
delete(node.Status.Allocatable, k)
}
}
allocatableReservation := kl.containerManager.GetNodeAllocatableReservation()
for k, v := range node.Status.Capacity {
value := *(v.Copy())
Expand Down
28 changes: 3 additions & 25 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,30 +468,6 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s
return true, nil, nil
}

// Returns a *schedulercache.Resource that covers the largest width in each
// resource dimension. Because init-containers run sequentially, we collect the
// max in each dimension iteratively. In contrast, we sum the resource vectors
// for regular containers since they run simultaneously.
//
// Example:
//
// Pod:
// InitContainers
// IC1:
// CPU: 2
// Memory: 1G
// IC2:
// CPU: 2
// Memory: 3G
// Containers
// C1:
// CPU: 2
// Memory: 1G
// C2:
// CPU: 1
// Memory: 1G
//
// Result: CPU: 3, Memory: 3G
func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
result := schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -529,8 +505,10 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
default:
if v1.IsOpaqueIntResourceName(rName) {
value := rQuantity.Value()
// Ensure the opaque resource map is initialized in the result.
result.AddOpaque(rName, int64(0))
if value > result.OpaqueIntResources[rName] {
result.SetOpaque(rName, value)
result.OpaqueIntResources[rName] = value
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions plugin/pkg/scheduler/schedulercache/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,11 @@ func (r *Resource) ResourceList() v1.ResourceList {
}

func (r *Resource) AddOpaque(name v1.ResourceName, quantity int64) {
r.SetOpaque(name, r.OpaqueIntResources[name]+quantity)
}

func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) {
// Lazily allocate opaque integer resource map.
if r.OpaqueIntResources == nil {
r.OpaqueIntResources = map[v1.ResourceName]int64{}
}
r.OpaqueIntResources[name] = quantity
r.OpaqueIntResources[name] += quantity
}

// NewNodeInfo returns a ready to use empty NodeInfo object.
Expand Down Expand Up @@ -337,7 +333,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
n.allowedPodNumber = int(rQuant.Value())
default:
if v1.IsOpaqueIntResourceName(rName) {
n.allocatableResource.SetOpaque(rName, rQuant.Value())
n.allocatableResource.AddOpaque(rName, rQuant.Value())
}
}
}
Expand Down
125 changes: 32 additions & 93 deletions test/e2e/opaque_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = framework.KubeDescribe("Opaque resources", func() {
var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() {
f := framework.NewDefaultFramework("opaque-resource")
opaqueResName := v1.OpaqueIntResourceName("foo")
var node *v1.Node
Expand All @@ -59,19 +59,11 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
}
}

removeOpaqueResource(f, node.Name, opaqueResName)
addOpaqueResource(f, node.Name, opaqueResName)
})

// TODO: The suite times out if removeOpaqueResource is called as part of
// an AfterEach closure. For now, it is the last statement in each
// It block.
// AfterEach(func() {
// removeOpaqueResource(f, node.Name, opaqueResName)
// })

It("should not break pods that do not consume opaque integer resources.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a vanilla pod")
requests := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.1")}
limits := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.2")}
Expand All @@ -82,17 +74,19 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
// Here we don't check for the bound node name since it can land on
// any one (this pod doesn't require any of the opaque resource.)
predicate := scheduleSuccess(pod.Name, "")
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
// Here we don't check for the bound node name since it can land on
// any one (this pod doesn't require any of the opaque resource.)
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should schedule pods that do consume opaque integer resources.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
requests := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.1"),
Expand All @@ -109,15 +103,17 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := scheduleSuccess(pod.Name, node.Name)
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should not schedule pods that exceed the available amount of opaque integer resource.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires more of the opaque resource than is allocatable on any node")
requests := v1.ResourceList{opaqueResName: resource.MustParse("6")}
limits := v1.ResourceList{}
Expand All @@ -127,15 +123,17 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits))
return err
}
predicate := scheduleFailure("over-max-oir")
predicate := func(e *v1.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should account opaque integer resources in pods with multiple containers.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node")
requests := v1.ResourceList{opaqueResName: resource.MustParse("1")}
limits := v1.ResourceList{}
Expand Down Expand Up @@ -172,7 +170,11 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := scheduleSuccess(pod.Name, node.Name)
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
Expand Down Expand Up @@ -212,53 +214,11 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
_, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate = scheduleFailure(pod.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should schedule pods that initially do not fit after enough opaque integer resources are freed.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
requests := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.1"),
opaqueResName: resource.MustParse("3"),
}
limits := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.2"),
opaqueResName: resource.MustParse("3"),
predicate = func(e *v1.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
}
pod1 := newTestPod(f, "oir-1", requests, limits)
pod2 := newTestPod(f, "oir-2", requests, limits)

By("Observing an event that indicates one pod was scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod1)
return err
}
predicate := scheduleSuccess(pod1.Name, node.Name)
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))

By("Observing an event that indicates a subsequent pod was not scheduled")
action = func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod2)
return err
}
predicate = scheduleFailure(pod2.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))

By("Observing an event that indicates the second pod was scheduled after deleting the first pod")
action = func() error {
err := f.ClientSet.Core().Pods(f.Namespace.Name).Delete(pod1.Name, nil)
return err
}
predicate = scheduleSuccess(pod2.Name, node.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
Expand All @@ -268,14 +228,12 @@ var _ = framework.KubeDescribe("Opaque resources", func() {
// Adds the opaque resource to a node.
func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
action := func() error {
By(fmt.Sprintf("Adding OIR to node [%s]", nodeName))
patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName)))
return f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error()
}
predicate := func(n *v1.Node) bool {
capacity, foundCap := n.Status.Capacity[opaqueResName]
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
return foundCap && capacity.MilliValue() == int64(5000) &&
foundAlloc && allocatable.MilliValue() == int64(5000)
}
Expand All @@ -287,16 +245,14 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1
// Removes the opaque resource from a node.
func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
action := func() error {
By(fmt.Sprintf("Removing OIR from node [%s]", nodeName))
patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName)))
f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do()
return nil // Ignore error -- the opaque resource may not exist.
}
predicate := func(n *v1.Node) bool {
capacity, foundCap := n.Status.Capacity[opaqueResName]
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero())
_, foundCap := n.Status.Capacity[opaqueResName]
_, foundAlloc := n.Status.Allocatable[opaqueResName]
return !foundCap && !foundAlloc
}
success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -389,7 +345,7 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e, ok := obj.(*v1.Event)
By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message))
Expect(ok).To(Equal(true))
if ok && eventPredicate(e) {
observedMatchingEvent = true
Expand Down Expand Up @@ -417,20 +373,3 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
})
return err == nil, err
}

func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool {
return func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.HasPrefix(e.Name, podName) &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", podName, nodeName))
}
}

func scheduleFailure(podName string) func(*v1.Event) bool {
return func(e *v1.Event) bool {
return strings.HasPrefix(e.Name, podName) &&
e.Type == "Warning" &&
e.Reason == "FailedScheduling"
}
}

0 comments on commit 8e52bec

Please sign in to comment.