Skip to content

Commit

Permalink
Merge pull request kubernetes#88409 from aojea/affinity
Browse files Browse the repository at this point in the history
deflake e2e session affinity tests
  • Loading branch information
k8s-ci-robot authored Feb 25, 2020
2 parents 46fcbcf + 64c4876 commit b5e95fc
Showing 1 changed file with 90 additions and 7 deletions.
97 changes: 90 additions & 7 deletions test/e2e/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,26 @@ type portsByPodName map[string][]int
// return false only in case of unexpected errors.
func checkAffinity(execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool {
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort)
curl := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort)
cmd := fmt.Sprintf("for i in $(seq 0 %d); do echo; %s ; done", AffinityConfirmCount, curl)
timeout := AffinityTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
}
var tracker affinityTracker
if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
// interval considering a maximum of 2 seconds per connection
interval := 2 * AffinityConfirmCount * time.Second
if pollErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
if execPod != nil {
stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort)
return false, nil
}
tracker.recordHost(stdout)
hosts := strings.Split(stdout, "\n")
for _, host := range hosts {
tracker.recordHost(strings.TrimSpace(host))
}
} else {
rawResponse := GetHTTPContent(serviceIP, servicePort, timeout, "")
tracker.recordHost(rawResponse.String())
Expand Down Expand Up @@ -2357,28 +2363,42 @@ var _ = SIGDescribe("Services", func() {
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should have session affinity work for service with type clusterIP [LinuxOnly] [Flaky]", func() {
ginkgo.It("should have session affinity work for service with type clusterIP [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-clusterip")
svc.Spec.Type = v1.ServiceTypeClusterIP
execAffinityTestForNonLBService(f, cs, svc)
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should be able to switch session affinity for service with type clusterIP [LinuxOnly] [Flaky]", func() {
ginkgo.It("should have session affinity timeout work for service with type clusterIP [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-clusterip-timeout")
svc.Spec.Type = v1.ServiceTypeClusterIP
execAffinityTestForSessionAffinityTimeout(f, cs, svc)
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should be able to switch session affinity for service with type clusterIP [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-clusterip-transition")
svc.Spec.Type = v1.ServiceTypeClusterIP
execAffinityTestForNonLBServiceWithTransition(f, cs, svc)
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should have session affinity work for NodePort service [LinuxOnly] [Flaky]", func() {
ginkgo.It("should have session affinity work for NodePort service [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-nodeport")
svc.Spec.Type = v1.ServiceTypeNodePort
execAffinityTestForNonLBService(f, cs, svc)
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should be able to switch session affinity for NodePort service [LinuxOnly] [Flaky]", func() {
ginkgo.It("should have session affinity timeout work for NodePort service [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-nodeport-timeout")
svc.Spec.Type = v1.ServiceTypeNodePort
execAffinityTestForSessionAffinityTimeout(f, cs, svc)
})

// [LinuxOnly]: Windows does not support session affinity.
ginkgo.It("should be able to switch session affinity for NodePort service [LinuxOnly]", func() {
svc := getServeHostnameService("affinity-nodeport-transition")
svc.Spec.Type = v1.ServiceTypeNodePort
execAffinityTestForNonLBServiceWithTransition(f, cs, svc)
Expand Down Expand Up @@ -3036,6 +3056,69 @@ func execSourceipTest(pausePod v1.Pod, serviceAddress string) (string, string) {
return pausePod.Status.PodIP, host
}

// execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of
// affinity test for non-load-balancer services. Session afinity will be
// enabled when the service is created and a short timeout will be configured so
// session affinity must change after the timeout expirese.
func execAffinityTestForSessionAffinityTimeout(f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
ns := f.Namespace.Name
numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
ginkgo.By("creating service in namespace " + ns)
serviceType := svc.Spec.Type
// set an affinity timeout equal to the number of connection requests
svcSessionAffinityTimeout := int32(AffinityConfirmCount)
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
}
_, _, err := StartServeHostnameService(cs, svc, ns, numPods)
framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
defer func() {
StopServeHostnameService(cs, ns, serviceName)
}()
jig := e2eservice.NewTestJig(cs, ns, serviceName)
svc, err = jig.Client.CoreV1().Services(ns).Get(context.TODO(), serviceName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns)
var svcIP string
if serviceType == v1.ServiceTypeNodePort {
nodes, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err)
addrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
gomega.Expect(len(addrs)).To(gomega.BeNumerically(">", 0), "ginkgo.Failed to get Node internal IP")
svcIP = addrs[0]
servicePort = int(svc.Spec.Ports[0].NodePort)
} else {
svcIP = svc.Spec.ClusterIP
}

execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil)
defer func() {
framework.Logf("Cleaning up the exec pod")
err := cs.CoreV1().Pods(ns).Delete(context.TODO(), execPod.Name, nil)
framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
}()
err = jig.CheckServiceReachability(svc, execPod)
framework.ExpectNoError(err)

// the service should be sticky until the timeout expires
framework.ExpectEqual(checkAffinity(execPod, svcIP, servicePort, true), true)
// but it should return different hostnames after the timeout expires
// try several times to avoid the probability that we hit the same pod twice
hosts := sets.NewString()
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, net.JoinHostPort(svcIP, strconv.Itoa(servicePort)))
for i := 0; i < 10; i++ {
hostname, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err == nil {
hosts.Insert(hostname)
if hosts.Len() > 1 {
return
}
time.Sleep(time.Duration(svcSessionAffinityTimeout) * time.Second)
}
}
framework.Fail("Session is sticky after reaching the timeout")
}

func execAffinityTestForNonLBServiceWithTransition(f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
execAffinityTestForNonLBServiceWithOptionalTransition(f, cs, svc, true)
}
Expand Down

0 comments on commit b5e95fc

Please sign in to comment.