Skip to content

Commit

Permalink
Merge pull request kubernetes#42928 from bsalamat/e2e_flake_predicates
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 43180, 42928)

Fix waitForScheduler in scheduer predicates e2e tests

**What this PR does / why we need it**: Fixes waitForScheduler in e2e to resolve flaky tests in scheduler_predicates.go

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes kubernetes#42691

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
  • Loading branch information
Kubernetes Submit Queue authored Mar 16, 2017
2 parents 1357caf + 2775a52 commit 5139da2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 44 deletions.
6 changes: 4 additions & 2 deletions test/e2e/common/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
. "github.com/onsi/gomega"
)

type Action func() error

// Returns true if a node update matching the predicate was emitted from the
// system after performing the supplied action.
func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action func() error) (bool, error) {
func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodePredicate func(*v1.Node) bool, action Action) (bool, error) {
observedMatchingNode := false
nodeSelector := fields.OneTermEqualSelector("metadata.name", nodeName)
informerStartedChan := make(chan struct{})
Expand Down Expand Up @@ -94,7 +96,7 @@ func ObserveNodeUpdateAfterAction(f *framework.Framework, nodeName string, nodeP

// Returns true if an event matching the predicate was emitted from the system
// after performing the supplied action.
func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action func() error) (bool, error) {
func ObserveEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Event) bool, action Action) (bool, error) {
observedMatchingEvent := false
informerStartedChan := make(chan struct{})
var informerStartedGuard sync.Once
Expand Down
98 changes: 56 additions & 42 deletions test/e2e/scheduling/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/test/e2e/common"
"k8s.io/kubernetes/test/e2e/framework"
testutils "k8s.io/kubernetes/test/utils"

Expand Down Expand Up @@ -133,11 +134,10 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
}), true, framework.Logf))
}
podName := "additional-pod"
createPausePod(f, pausePodConfig{
WaitForSchedulerAfterAction(f, createPausePodAction(f, pausePodConfig{
Name: podName,
Labels: map[string]string{"name": "additional"},
})
waitForScheduler()
}), podName, false)
verifyResult(cs, podsNeededForSaturation, 1, ns)
})

Expand Down Expand Up @@ -202,16 +202,16 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
}), true, framework.Logf))
}
podName := "additional-pod"
createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: podName,
Labels: map[string]string{"name": "additional"},
Resources: &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": *resource.NewMilliQuantity(milliCpuPerPod, "DecimalSI"),
},
},
})
waitForScheduler()
}
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
verifyResult(cs, podsNeededForSaturation, 1, ns)
})

Expand All @@ -223,22 +223,22 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {

framework.WaitForStableCluster(cs, masterNodes)

createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: podName,
Labels: map[string]string{"name": "restricted"},
NodeSelector: map[string]string{
"label": "nonempty",
},
})
}

waitForScheduler()
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
verifyResult(cs, 0, 1, ns)
})

It("validates that a pod with an invalid NodeAffinity is rejected", func() {
By("Trying to launch a pod with an invalid Affinity data.")
podName := "without-label"
_, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{
_, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{
Name: podName,
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
Expand All @@ -256,9 +256,6 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
if err == nil || !errors.IsInvalid(err) {
framework.Failf("Expect error of invalid, got : %v", err)
}

// Wait a bit to allow scheduler to do its thing if the pod is not rejected.
waitForScheduler()
})

It("validates that NodeSelector is respected if matching [Conformance]", func() {
Expand Down Expand Up @@ -300,7 +297,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {

framework.WaitForStableCluster(cs, masterNodes)

createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: podName,
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
Expand Down Expand Up @@ -328,8 +325,8 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
},
},
Labels: map[string]string{"name": "restricted"},
})
waitForScheduler()
}
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
verifyResult(cs, 0, 1, ns)
})

Expand Down Expand Up @@ -378,7 +375,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
})
Expand All @@ -388,7 +385,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
It("validates that a pod with an invalid podAffinity is rejected because of the LabelSelectorRequirement is invalid", func() {
By("Trying to launch a pod with an invalid pod Affinity data.")
podName := "without-label-" + string(uuid.NewUUID())
_, err := cs.Core().Pods(ns).Create(initPausePod(f, pausePodConfig{
_, err := cs.CoreV1().Pods(ns).Create(initPausePod(f, pausePodConfig{
Name: podName,
Labels: map[string]string{"name": "without-label"},
Affinity: &v1.Affinity{
Expand All @@ -414,17 +411,14 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
if err == nil || !errors.IsInvalid(err) {
framework.Failf("Expect error of invalid, got : %v", err)
}

// Wait a bit to allow scheduler to do its thing if the pod is not rejected.
waitForScheduler()
})

// Test Nodes does not have any pod, hence it should be impossible to schedule a Pod with pod affinity.
It("validates that Inter-pod-Affinity is respected if not matching", func() {
By("Trying to schedule Pod with nonempty Pod Affinity.")
framework.WaitForStableCluster(cs, masterNodes)
podName := "without-label-" + string(uuid.NewUUID())
createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: podName,
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
Expand All @@ -444,9 +438,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
},
},
},
})
}

waitForScheduler()
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podName, false)
verifyResult(cs, 0, 1, ns)
})

Expand Down Expand Up @@ -492,7 +486,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
})
Expand All @@ -506,7 +500,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
By("Launching two pods on two distinct nodes to get two node names")
CreateHostPortPods(f, "host-port", 2, true)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port")
podList, err := cs.Core().Pods(ns).List(metav1.ListOptions{})
podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)
Expect(len(podList.Items)).To(Equal(2))
nodeNames := []string{podList.Items[0].Spec.NodeName, podList.Items[1].Spec.NodeName}
Expand All @@ -532,7 +526,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {

By("Trying to launch another pod, now with podAntiAffinity with same Labels.")
labelPodName := "with-podantiaffinity-" + string(uuid.NewUUID())
createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: labelPodName,
Labels: map[string]string{"service": "Diff"},
NodeSelector: map[string]string{k: v}, // only launch on our two nodes, contradicting the podAntiAffinity
Expand All @@ -555,9 +549,9 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
},
},
},
})
}

waitForScheduler()
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), labelPodName, false)
verifyResult(cs, 3, 1, ns)
})

Expand Down Expand Up @@ -609,7 +603,7 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {
// already when the kubelet does not know about its new label yet. The
// kubelet will then refuse to launch the pod.
framework.ExpectNoError(framework.WaitForPodNotPending(cs, ns, labelPodName))
labelPod, err := cs.Core().Pods(ns).Get(labelPodName, metav1.GetOptions{})
labelPod, err := cs.CoreV1().Pods(ns).Get(labelPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
Expect(labelPod.Spec.NodeName).To(Equal(nodeName))
})
Expand Down Expand Up @@ -732,18 +726,16 @@ var _ = framework.KubeDescribe("SchedulerPredicates [Serial]", func() {

By("Trying to relaunch the pod, still no tolerations.")
podNameNoTolerations := "still-no-tolerations"
createPausePod(f, pausePodConfig{
conf := pausePodConfig{
Name: podNameNoTolerations,
NodeSelector: map[string]string{labelKey: labelValue},
})
}

waitForScheduler()
WaitForSchedulerAfterAction(f, createPausePodAction(f, conf), podNameNoTolerations, false)
verifyResult(cs, 0, 1, ns)

By("Removing taint off the node")
framework.RemoveTaintOffNode(cs, nodeName, testTaint)

waitForScheduler()
WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true)
verifyResult(cs, 1, 0, ns)
})
})
Expand Down Expand Up @@ -900,16 +892,38 @@ func getRequestedCPU(pod v1.Pod) int64 {
return result
}

func waitForScheduler() {
// Wait a bit to allow scheduler to do its thing
// TODO: this is brittle; there's no guarantee the scheduler will have run in 10 seconds.
framework.Logf("Sleeping 10 seconds and crossing our fingers that scheduler will run in that time.")
time.Sleep(10 * time.Second)
// removeTaintFromNodeAction returns a closure that removes the given taint
// from the given node upon invocation.
func removeTaintFromNodeAction(cs clientset.Interface, nodeName string, testTaint v1.Taint) common.Action {
return func() error {
framework.RemoveTaintOffNode(cs, nodeName, testTaint)
return nil
}
}

// createPausePodAction returns a closure that creates a pause pod upon invocation.
func createPausePodAction(f *framework.Framework, conf pausePodConfig) common.Action {
return func() error {
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(initPausePod(f, conf))
return err
}
}

// WaitForSchedulerAfterAction performs the provided action and then waits for
// scheduler to act on the given pod.
func WaitForSchedulerAfterAction(f *framework.Framework, action common.Action, podName string, expectSuccess bool) {
predicate := scheduleFailureEvent(podName)
if expectSuccess {
predicate = scheduleSuccessEvent(podName, "" /* any node */)
}
success, err := common.ObserveEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
}

// TODO: upgrade calls in PodAffinity tests when we're able to run them
func verifyResult(c clientset.Interface, expectedScheduled int, expectedNotScheduled int, ns string) {
allPods, err := c.Core().Pods(ns).List(metav1.ListOptions{})
allPods, err := c.CoreV1().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)
scheduledPods, notScheduledPods := framework.GetPodsScheduled(masterNodes, allPods)

Expand Down

0 comments on commit 5139da2

Please sign in to comment.