From 5964c216b328cd9cabbc466559bb72847f9787f9 Mon Sep 17 00:00:00 2001 From: terasihma Date: Wed, 27 Mar 2024 05:01:24 +0000 Subject: [PATCH] Fix to check that egress_watcher pick a valid client Signed-off-by: terasihma --- v2/controllers/egress_watcher.go | 18 ++++ v2/controllers/egress_watcher_test.go | 114 ++++++++++++++++++++------ v2/controllers/pod_watcher_test.go | 23 +++--- 3 files changed, 119 insertions(+), 36 deletions(-) diff --git a/v2/controllers/egress_watcher.go b/v2/controllers/egress_watcher.go index 3b49d004..93262841 100644 --- a/v2/controllers/egress_watcher.go +++ b/v2/controllers/egress_watcher.go @@ -55,7 +55,25 @@ func (r *EgressWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, err } + targetPods := make(map[string]corev1.Pod) for _, pod := range pods.Items { + if pod.Spec.HostNetwork { + // Pods in host network cannot use egress NAT. + // So skip it. + continue + } + podIp := pod.Status.PodIP + // The reconciliation should be triggered only for running pods. + if pod.Status.Phase == corev1.PodRunning { + if _, found := targetPods[podIp]; found { + // multiple running pods have the same address. + return ctrl.Result{}, fmt.Errorf("multiple pods have the same address: %s", podIp) + } + targetPods[podIp] = pod + } + } + + for _, pod := range targetPods { for k, v := range pod.Annotations { if !strings.HasPrefix(k, constants.AnnEgressPrefix) { continue diff --git a/v2/controllers/egress_watcher_test.go b/v2/controllers/egress_watcher_test.go index 3ace46b7..ce898e7f 100644 --- a/v2/controllers/egress_watcher_test.go +++ b/v2/controllers/egress_watcher_test.go @@ -2,8 +2,8 @@ package controllers import ( "context" + "fmt" "net" - "reflect" "sync" "time" @@ -21,7 +21,7 @@ import ( var _ = Describe("Egress watcher", func() { ctx := context.Background() - podNetwork := &mockPodNetwork{ips: make(map[string]bool)} + podNetwork := &mockPodNetwork{ips: make(map[string]int)} var cancel context.CancelFunc BeforeEach(func() { @@ -43,13 +43,16 @@ var _ = Describe("Egress watcher", func() { makePod("pod1", []string{"10.1.1.2", "fd01::2"}, map[string]string{ "default": "egress1", - }) + }, corev1.PodRunning) makePod("pod2", []string{"10.1.1.3", "fd01::3"}, map[string]string{ "default": "egress2", - }) + }, corev1.PodRunning) makePod("pod3", []string{"10.1.1.4", "fd01::4"}, map[string]string{ "internet": "egress1", - }) + }, corev1.PodRunning) + makePod("pod4", []string{"10.1.1.5", "fd01::5"}, map[string]string{ + "default": "egress1", + }, corev1.PodSucceeded) ctx, cancel = context.WithCancel(context.TODO()) mgr, err := ctrl.NewManager(cfg, ctrl.Options{ @@ -97,26 +100,72 @@ var _ = Describe("Egress watcher", func() { Expect(err).ToNot(HaveOccurred()) // pod1 is a client of default/egress1, but pod2 and pod3 are the clients of other egresses - Eventually(func() bool { - return reflect.DeepEqual(podNetwork.updatedPodIPs(), map[string]bool{ - "10.1.1.2": true, - "fd01::2": true, - }) - }).Should(BeTrue()) - - Consistently(func() bool { - return reflect.DeepEqual(podNetwork.updatedPodIPs(), map[string]bool{ - "10.1.1.2": true, - "fd01::2": true, - }) - }, 5*time.Second, 1*time.Second).Should(BeTrue()) - }) + // pod4 is not the target of the reconciliation because pod4 is not running. + Eventually(func() error { + updated := podNetwork.updatedPodIPs() + updated4, ok := updated["10.1.1.2"] + if !ok || updated4 != 1 { + return fmt.Errorf("pod1's IPv4 address is not updated") + } + updated6, ok := updated["fd01::2"] + if !ok || updated6 != 1 { + return fmt.Errorf("pod1's IPv4 address is not updated") + } + return nil + }).Should(Succeed()) + + Consistently(func() error { + updated := podNetwork.updatedPodIPs() + updated4, ok := updated["10.1.1.2"] + if !ok || updated4 != 1 { + return fmt.Errorf("pod1's IPv4 address is not updated") + } + updated6, ok := updated["fd01::2"] + if !ok || updated6 != 1 { + return fmt.Errorf("pod1's IPv4 address is not updated") + } + return nil + }, 5*time.Second, 1*time.Second).Should(Succeed()) + + makePod("pod5", []string{"10.1.1.3", "fd01::3"}, map[string]string{ + "default": "egress2", + }, corev1.PodRunning) + + eg2 := makeEgress("egress2") + err = k8sClient.Create(ctx, eg2) + Expect(err).ShouldNot(HaveOccurred()) + svc2 := &corev1.Service{} + svc2.Namespace = "default" + svc2.Name = "egress2" + svc2.Spec.Type = corev1.ServiceTypeClusterIP + svc2.Spec.Ports = []corev1.ServicePort{{ + Port: 5555, + TargetPort: intstr.FromInt(5555), + Protocol: corev1.ProtocolUDP, + }} + err = k8sClient.Create(ctx, svc2) + Expect(err).ShouldNot(HaveOccurred()) + + // Reconciliation should fail because of duplicate address + Consistently(func() error { + updated := podNetwork.updatedPodIPs() + _, ok := updated["10.1.1.3"] + if ok { + return fmt.Errorf("pod2 and pod5 should not be updated") + } + _, ok = updated["fd01::3"] + if ok { + return fmt.Errorf("pod2 and pod5 should not be updated") + } + return nil + }, 5*time.Second, 1*time.Second).Should(Succeed()) + }) }) type mockPodNetwork struct { nUpdate int - ips map[string]bool + ips map[string]int mu sync.Mutex } @@ -137,19 +186,34 @@ func (p *mockPodNetwork) Update(podIPv4, podIPv6 net.IP, hook nodenet.SetupHook) defer p.mu.Unlock() p.nUpdate++ - p.ips[podIPv4.String()] = true - p.ips[podIPv6.String()] = true + c4, ok := p.ips[podIPv4.String()] + if !ok { + p.ips[podIPv4.String()] = 1 + } else { + c4 += 1 + } + c6, ok := p.ips[podIPv6.String()] + if !ok { + p.ips[podIPv6.String()] = 1 + } else { + c6 += 1 + } return nil } -func (p *mockPodNetwork) updatedPodIPs() map[string]bool { - m := make(map[string]bool) +func (p *mockPodNetwork) updatedPodIPs() map[string]int { + m := make(map[string]int) p.mu.Lock() defer p.mu.Unlock() for k := range p.ips { - m[k] = true + c, ok := m[k] + if !ok { + m[k] = 1 + } else { + c += 1 + } } return m } diff --git a/v2/controllers/pod_watcher_test.go b/v2/controllers/pod_watcher_test.go index 601f2b79..0db745da 100644 --- a/v2/controllers/pod_watcher_test.go +++ b/v2/controllers/pod_watcher_test.go @@ -41,7 +41,7 @@ func makePodWithHostNetwork(name string, ips []string, egresses map[string]strin ExpectWithOffset(1, err).ShouldNot(HaveOccurred()) } -func makePod(name string, ips []string, egresses map[string]string) { +func makePod(name string, ips []string, egresses map[string]string, phase corev1.PodPhase) { pod := &corev1.Pod{} pod.Name = name pod.Namespace = "default" @@ -62,6 +62,7 @@ func makePod(name string, ips []string, egresses map[string]string) { podIPs[i] = corev1.PodIP{IP: ip} } pod.Status.PodIPs = podIPs + pod.Status.Phase = phase err = k8sClient.Status().Update(context.Background(), pod) ExpectWithOffset(1, err).ShouldNot(HaveOccurred()) } @@ -82,17 +83,17 @@ var _ = Describe("Pod watcher", func() { var eg *mockEgress BeforeEach(func() { - makePod("pod1", []string{"10.1.1.1", "fd01::1"}, nil) + makePod("pod1", []string{"10.1.1.1", "fd01::1"}, nil, corev1.PodRunning) makePod("pod2", []string{"10.1.1.2", "fd01::2"}, map[string]string{ "internet": "egress2", "external": "egress1,egress2", - }) + }, corev1.PodRunning) makePod("pod3", []string{"fd01::3"}, map[string]string{ "internet": "egress1,egress2", - }) + }, corev1.PodRunning) makePod("pod4", []string{"fd01::4"}, map[string]string{ "external": "egress1", - }) + }, corev1.PodRunning) ctx, cancel = context.WithCancel(context.TODO()) ft = &mockFoUTunnel{peers: make(map[string]bool)} @@ -144,10 +145,10 @@ var _ = Describe("Pod watcher", func() { }) It("should handle new Pods", func() { - makePod("pod5", []string{"10.1.1.5"}, nil) + makePod("pod5", []string{"10.1.1.5"}, nil, corev1.PodRunning) makePod("pod6", []string{"10.1.1.6"}, map[string]string{ "internet": "egress2", - }) + }, corev1.PodRunning) Eventually(func() bool { return reflect.DeepEqual(ft.GetPeers(), map[string]bool{ "10.1.1.2": true, @@ -295,7 +296,7 @@ var _ = Describe("Pod watcher", func() { It("should not delete a peer that another pod is reusing", func() { makePod("job", []string{"10.1.1.5"}, map[string]string{ "internet": "egress2", - }) + }, corev1.PodRunning) Eventually(func() bool { return reflect.DeepEqual(ft.GetPeers(), map[string]bool{ @@ -325,7 +326,7 @@ var _ = Describe("Pod watcher", func() { // another pod reuses the same ip makePod("another", []string{"10.1.1.5"}, map[string]string{ "internet": "egress2", - }) + }, corev1.PodRunning) jobPod = &corev1.Pod{} err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "job"}, jobPod) @@ -347,7 +348,7 @@ var _ = Describe("Pod watcher", func() { It("should not delete a peer when another pod accidentally hits the same one", func() { makePod("job", []string{"10.1.1.5"}, map[string]string{ "internet": "egress2", - }) + }, corev1.PodRunning) Eventually(func() bool { return reflect.DeepEqual(ft.GetPeers(), map[string]bool{ @@ -361,7 +362,7 @@ var _ = Describe("Pod watcher", func() { // another pod hits the same ip and trigger the adding pod event makePod("another", []string{"10.1.1.5"}, map[string]string{ "internet": "egress2", - }) + }, corev1.PodRunning) jobPod := &corev1.Pod{} err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "job"}, jobPod)