diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 7606f89e8a587..42db66cfe38ed 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -114,20 +114,26 @@ type portsByPodName map[string][]int // return false only in case of unexpected errors. func checkAffinity(execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool { serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) - cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort) + curl := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort) + cmd := fmt.Sprintf("for i in $(seq 0 %d); do echo; %s ; done", AffinityConfirmCount, curl) timeout := AffinityTimeout if execPod == nil { timeout = LoadBalancerPollTimeout } var tracker affinityTracker - if pollErr := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { + // interval considering a maximum of 2 seconds per connection + interval := 2 * AffinityConfirmCount * time.Second + if pollErr := wait.PollImmediate(interval, timeout, func() (bool, error) { if execPod != nil { stdout, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) if err != nil { framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort) return false, nil } - tracker.recordHost(stdout) + hosts := strings.Split(stdout, "\n") + for _, host := range hosts { + tracker.recordHost(strings.TrimSpace(host)) + } } else { rawResponse := GetHTTPContent(serviceIP, servicePort, timeout, "") tracker.recordHost(rawResponse.String()) @@ -2357,28 +2363,42 @@ var _ = SIGDescribe("Services", func() { }) // [LinuxOnly]: Windows does not support session affinity. - ginkgo.It("should have session affinity work for service with type clusterIP [LinuxOnly] [Flaky]", func() { + ginkgo.It("should have session affinity work for service with type clusterIP [LinuxOnly]", func() { svc := getServeHostnameService("affinity-clusterip") svc.Spec.Type = v1.ServiceTypeClusterIP execAffinityTestForNonLBService(f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. - ginkgo.It("should be able to switch session affinity for service with type clusterIP [LinuxOnly] [Flaky]", func() { + ginkgo.It("should have session affinity timeout work for service with type clusterIP [LinuxOnly]", func() { + svc := getServeHostnameService("affinity-clusterip-timeout") + svc.Spec.Type = v1.ServiceTypeClusterIP + execAffinityTestForSessionAffinityTimeout(f, cs, svc) + }) + + // [LinuxOnly]: Windows does not support session affinity. + ginkgo.It("should be able to switch session affinity for service with type clusterIP [LinuxOnly]", func() { svc := getServeHostnameService("affinity-clusterip-transition") svc.Spec.Type = v1.ServiceTypeClusterIP execAffinityTestForNonLBServiceWithTransition(f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. - ginkgo.It("should have session affinity work for NodePort service [LinuxOnly] [Flaky]", func() { + ginkgo.It("should have session affinity work for NodePort service [LinuxOnly]", func() { svc := getServeHostnameService("affinity-nodeport") svc.Spec.Type = v1.ServiceTypeNodePort execAffinityTestForNonLBService(f, cs, svc) }) // [LinuxOnly]: Windows does not support session affinity. - ginkgo.It("should be able to switch session affinity for NodePort service [LinuxOnly] [Flaky]", func() { + ginkgo.It("should have session affinity timeout work for NodePort service [LinuxOnly]", func() { + svc := getServeHostnameService("affinity-nodeport-timeout") + svc.Spec.Type = v1.ServiceTypeNodePort + execAffinityTestForSessionAffinityTimeout(f, cs, svc) + }) + + // [LinuxOnly]: Windows does not support session affinity. + ginkgo.It("should be able to switch session affinity for NodePort service [LinuxOnly]", func() { svc := getServeHostnameService("affinity-nodeport-transition") svc.Spec.Type = v1.ServiceTypeNodePort execAffinityTestForNonLBServiceWithTransition(f, cs, svc) @@ -3036,6 +3056,69 @@ func execSourceipTest(pausePod v1.Pod, serviceAddress string) (string, string) { return pausePod.Status.PodIP, host } +// execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of +// affinity test for non-load-balancer services. Session afinity will be +// enabled when the service is created and a short timeout will be configured so +// session affinity must change after the timeout expirese. +func execAffinityTestForSessionAffinityTimeout(f *framework.Framework, cs clientset.Interface, svc *v1.Service) { + ns := f.Namespace.Name + numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name + ginkgo.By("creating service in namespace " + ns) + serviceType := svc.Spec.Type + // set an affinity timeout equal to the number of connection requests + svcSessionAffinityTimeout := int32(AffinityConfirmCount) + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{ + ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout}, + } + _, _, err := StartServeHostnameService(cs, svc, ns, numPods) + framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns) + defer func() { + StopServeHostnameService(cs, ns, serviceName) + }() + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err = jig.Client.CoreV1().Services(ns).Get(context.TODO(), serviceName, metav1.GetOptions{}) + framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns) + var svcIP string + if serviceType == v1.ServiceTypeNodePort { + nodes, err := e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + addrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) + gomega.Expect(len(addrs)).To(gomega.BeNumerically(">", 0), "ginkgo.Failed to get Node internal IP") + svcIP = addrs[0] + servicePort = int(svc.Spec.Ports[0].NodePort) + } else { + svcIP = svc.Spec.ClusterIP + } + + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil) + defer func() { + framework.Logf("Cleaning up the exec pod") + err := cs.CoreV1().Pods(ns).Delete(context.TODO(), execPod.Name, nil) + framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns) + }() + err = jig.CheckServiceReachability(svc, execPod) + framework.ExpectNoError(err) + + // the service should be sticky until the timeout expires + framework.ExpectEqual(checkAffinity(execPod, svcIP, servicePort, true), true) + // but it should return different hostnames after the timeout expires + // try several times to avoid the probability that we hit the same pod twice + hosts := sets.NewString() + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, net.JoinHostPort(svcIP, strconv.Itoa(servicePort))) + for i := 0; i < 10; i++ { + hostname, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err == nil { + hosts.Insert(hostname) + if hosts.Len() > 1 { + return + } + time.Sleep(time.Duration(svcSessionAffinityTimeout) * time.Second) + } + } + framework.Fail("Session is sticky after reaching the timeout") +} + func execAffinityTestForNonLBServiceWithTransition(f *framework.Framework, cs clientset.Interface, svc *v1.Service) { execAffinityTestForNonLBServiceWithOptionalTransition(f, cs, svc, true) }