From b11d5c4f04e5ebc3330e9e36c663c6d4675a8f1c Mon Sep 17 00:00:00 2001 From: Pulkit Jain Date: Fri, 1 Dec 2023 18:14:52 +0530 Subject: [PATCH] Improve Egress API visibility Record event when EgressIP is assigned to the Node interface. Signed-off-by: Pulkit Jain --- .../antrea/templates/agent/clusterrole.yaml | 6 +++ build/yamls/antrea-aks.yml | 6 +++ build/yamls/antrea-eks.yml | 6 +++ build/yamls/antrea-gke.yml | 6 +++ build/yamls/antrea-ipsec.yml | 6 +++ build/yamls/antrea.yml | 6 +++ cmd/antrea-agent/agent.go | 2 +- .../controller/egress/egress_controller.go | 43 +++++++++++++++++-- .../egress/egress_controller_test.go | 1 + .../serviceexternalip/controller.go | 4 +- pkg/agent/ipassigner/ip_assigner.go | 9 +++- pkg/agent/ipassigner/ip_assigner_linux.go | 28 ++++++------ .../ipassigner/testing/mock_ipassigner.go | 14 +++--- test/e2e/egress_test.go | 6 +++ .../agent/ip_assigner_linux_test.go | 6 +-- 15 files changed, 118 insertions(+), 31 deletions(-) diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 7db11aebb8e..5b4f526af5f 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -219,3 +219,9 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index f33bf08c677..ec62cc0682b 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index b01e8967873..ec1a33e04b7 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 774d45b3570..48e5ce9fcf7 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 4a1dd0423b8..6ef947ab862 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -6269,6 +6269,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 28a9c10267d..452ef359835 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -6256,6 +6256,12 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - events + verbs: + - create --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 44604492295..6c30874d141 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -516,7 +516,7 @@ func run(o *Options) error { } if o.enableEgress { egressController, err = egress.NewEgressController( - ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName, + ofClient, k8sClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName, memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode, features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping), ) diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 00d50d668af..bd66b29e1cb 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -33,7 +33,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -49,6 +52,7 @@ import ( cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" + "antrea.io/antrea/pkg/client/clientset/versioned/scheme" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1beta1" "antrea.io/antrea/pkg/controller/metrics" @@ -136,6 +140,7 @@ type egressBinding struct { type EgressController struct { ofClient openflow.Client routeClient route.Interface + k8sClient kubernetes.Interface crdClient clientsetversioned.Interface antreaClientProvider agent.AntreaClientProvider @@ -175,10 +180,13 @@ type EgressController struct { serviceCIDRUpdateRetryDelay time.Duration trafficShapingEnabled bool + + record record.EventRecorder } func NewEgressController( ofClient openflow.Client, + k8sClient kubernetes.Interface, antreaClientGetter agent.AntreaClientProvider, crdClient clientsetversioned.Interface, ifaceStore interfacestore.InterfaceStore, @@ -196,9 +204,17 @@ func NewEgressController( if trafficShapingEnabled && !openflow.OVSMetersAreSupported() { klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.") } + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: controllerName}, + ) + c := &EgressController{ ofClient: ofClient, routeClient: routeClient, + k8sClient: k8sClient, antreaClientProvider: antreaClientGetter, crdClient: crdClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egressgroup"), @@ -220,6 +236,8 @@ func NewEgressController( serviceCIDRUpdateRetryDelay: 10 * time.Second, trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled, + + record: recorder, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -388,6 +406,17 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { klog.Infof("Starting %s", controllerName) defer klog.Infof("Shutting down %s", controllerName) + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(0) + eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{ + Interface: c.k8sClient.CoreV1().Events(""), + }) + c.record = eventBroadcaster.NewRecorder( + scheme.Scheme, + corev1.EventSource{Component: controllerName}, + ) + defer eventBroadcaster.Shutdown() + go c.localIPDetector.Run(stopCh) go c.egressIPScheduler.Run(stopCh) go c.ipAssigner.Run(stopCh) @@ -848,14 +877,22 @@ func (c *EgressController) syncEgress(egressName string) error { // Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to // another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is // obtained by this Node and another Node at the same time in some situations, e.g. split brain. - if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil { + exists, err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName) + if err != nil { return err } + if exists { + c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned Egress %s with IP %s on Node %s", egress.Name, desiredEgressIP, desiredNode) + } } else { // Unassign the Egress IP from the local Node if it was assigned by the agent. - if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil { + unAssigned, err := c.ipAssigner.UnassignIP(desiredEgressIP) + if err != nil { return err } + if unAssigned { + c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned Egress %s with IP %s", egress.Name, egress.Spec.EgressIP) + } } // Realize the latest EgressIP and get the desired mark. @@ -951,7 +988,7 @@ func (c *EgressController) uninstallEgress(egressName string, eState *egressStat } } // Unassign the Egress IP from the local Node if it was assigned by the agent. - if err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil { + if _, err := c.ipAssigner.UnassignIP(eState.egressIP); err != nil { return err } // Remove the Egress's state. diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 5f347bf04c7..256358f0a2d 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -182,6 +182,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller) mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any()) egressController, _ := NewEgressController(mockOFClient, + k8sClient, &antreaClientGetter{clientset}, crdClient, ifaceStore, diff --git a/pkg/agent/controller/serviceexternalip/controller.go b/pkg/agent/controller/serviceexternalip/controller.go index 31d108eb8fb..55d1c3cde58 100644 --- a/pkg/agent/controller/serviceexternalip/controller.go +++ b/pkg/agent/controller/serviceexternalip/controller.go @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty c.assignedIPsMutex.Lock() defer c.assignedIPsMutex.Unlock() if _, ok := c.assignedIPs[ip]; !ok { - if err := c.ipAssigner.AssignIP(ip, true); err != nil { + if _, err := c.ipAssigner.AssignIP(ip, true); err != nil { return err } c.assignedIPs[ip] = sets.New[string](service.String()) @@ -411,7 +411,7 @@ func (c *ServiceExternalIPController) unassignIP(ip string, service apimachinery return nil } if assigned.Len() == 1 && assigned.Has(service.String()) { - if err := c.ipAssigner.UnassignIP(ip); err != nil { + if _, err := c.ipAssigner.UnassignIP(ip); err != nil { return err } delete(c.assignedIPs, ip) diff --git a/pkg/agent/ipassigner/ip_assigner.go b/pkg/agent/ipassigner/ip_assigner.go index 8bfc681fd7b..89e73ae33f2 100644 --- a/pkg/agent/ipassigner/ip_assigner.go +++ b/pkg/agent/ipassigner/ip_assigner.go @@ -19,9 +19,14 @@ import "k8s.io/apimachinery/pkg/util/sets" // IPAssigner provides methods to assign or unassign IP. type IPAssigner interface { // AssignIP ensures the provided IP is assigned to the system. - AssignIP(ip string, forceAdvertise bool) error + // It returns True only in the case when there is no error and the IP provided + // is not assigned to the inetrface before the operation, in all other cases it + // return false. + AssignIP(ip string, forceAdvertise bool) (bool, error) // UnassignIP ensures the provided IP is not assigned to the system. - UnassignIP(ip string) error + // It returns True only in the case when there is no error and the IP provided + // is getting unassigned during the function call. + UnassignIP(ip string) (bool, error) // AssignedIPs return the IPs that are assigned to the system by this IPAssigner. AssignedIPs() sets.Set[string] // InitIPs ensures the IPs that are assigned to the system match the given IPs. diff --git a/pkg/agent/ipassigner/ip_assigner_linux.go b/pkg/agent/ipassigner/ip_assigner_linux.go index fa6946338a0..49e5e13f5c4 100644 --- a/pkg/agent/ipassigner/ip_assigner_linux.go +++ b/pkg/agent/ipassigner/ip_assigner_linux.go @@ -143,10 +143,10 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) { } // AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders. -func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { +func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) (bool, error) { parsedIP := net.ParseIP(ip) if parsedIP == nil { - return fmt.Errorf("invalid IP %s", ip) + return false, fmt.Errorf("invalid IP %s", ip) } a.mutex.Lock() defer a.mutex.Unlock() @@ -156,14 +156,14 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { if forceAdvertise { a.advertise(parsedIP) } - return nil + return false, nil } if a.dummyDevice != nil { addr := util.NewIPNet(parsedIP) if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil { if !errors.Is(err, unix.EEXIST) { - return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) + return false, fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) } else { klog.InfoS("IP was already assigned to interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name) } @@ -174,18 +174,18 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error { if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil { if err := a.arpResponder.AddIP(parsedIP); err != nil { - return fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err) + return false, fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err) } } if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil { if err := a.ndpResponder.AddIP(parsedIP); err != nil { - return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err) + return false, fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err) } } // Always advertise the IP when the IP is newly assigned to this Node. a.advertise(parsedIP) a.assignedIPs.Insert(ip) - return nil + return true, nil } func (a *ipAssigner) advertise(ip net.IP) { @@ -203,24 +203,24 @@ func (a *ipAssigner) advertise(ip net.IP) { } // UnassignIP ensures the provided IP is not assigned to the dummy device. -func (a *ipAssigner) UnassignIP(ip string) error { +func (a *ipAssigner) UnassignIP(ip string) (bool, error) { parsedIP := net.ParseIP(ip) if parsedIP == nil { - return fmt.Errorf("invalid IP %s", ip) + return false, fmt.Errorf("invalid IP %s", ip) } a.mutex.Lock() defer a.mutex.Unlock() if !a.assignedIPs.Has(ip) { klog.V(2).InfoS("The IP is not assigned", "ip", ip) - return nil + return false, nil } if a.dummyDevice != nil { addr := util.NewIPNet(parsedIP) if err := netlink.AddrDel(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil { if !errors.Is(err, unix.EADDRNOTAVAIL) { - return fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) + return false, fmt.Errorf("failed to delete IP %v from interface %s: %v", ip, a.dummyDevice.Attrs().Name, err) } else { klog.InfoS("IP does not exist on interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name) } @@ -230,17 +230,17 @@ func (a *ipAssigner) UnassignIP(ip string) error { if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil { if err := a.arpResponder.RemoveIP(parsedIP); err != nil { - return fmt.Errorf("failed to remove IP %v from ARP responder: %v", ip, err) + return false, fmt.Errorf("failed to remove IP %v from ARP responder: %v", ip, err) } } if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil { if err := a.ndpResponder.RemoveIP(parsedIP); err != nil { - return fmt.Errorf("failed to remove IP %v from NDP responder: %v", ip, err) + return false, fmt.Errorf("failed to remove IP %v from NDP responder: %v", ip, err) } } a.assignedIPs.Delete(ip) - return nil + return true, nil } // AssignedIPs return the IPs that are assigned to the dummy device. diff --git a/pkg/agent/ipassigner/testing/mock_ipassigner.go b/pkg/agent/ipassigner/testing/mock_ipassigner.go index 57d42d7d8f8..3406707fdaf 100644 --- a/pkg/agent/ipassigner/testing/mock_ipassigner.go +++ b/pkg/agent/ipassigner/testing/mock_ipassigner.go @@ -54,11 +54,12 @@ func (m *MockIPAssigner) EXPECT() *MockIPAssignerMockRecorder { } // AssignIP mocks base method. -func (m *MockIPAssigner) AssignIP(arg0 string, arg1 bool) error { +func (m *MockIPAssigner) AssignIP(arg0 string, arg1 bool) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AssignIP", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 } // AssignIP indicates an expected call of AssignIP. @@ -108,11 +109,12 @@ func (mr *MockIPAssignerMockRecorder) Run(arg0 any) *gomock.Call { } // UnassignIP mocks base method. -func (m *MockIPAssigner) UnassignIP(arg0 string) error { +func (m *MockIPAssigner) UnassignIP(arg0 string) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UnassignIP", arg0) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 } // UnassignIP indicates an expected call of UnassignIP. diff --git a/test/e2e/egress_test.go b/test/e2e/egress_test.go index 230013ddae5..ddb5634948e 100644 --- a/test/e2e/egress_test.go +++ b/test/e2e/egress_test.go @@ -35,6 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/crd/v1beta1" + "antrea.io/antrea/pkg/client/clientset/versioned/scheme" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/util/k8s" ) @@ -406,6 +407,11 @@ func testEgressCRUD(t *testing.T, data *TestData) { exists, err := hasIP(data, egress.Status.EgressNode, egress.Spec.EgressIP) require.NoError(t, err, "Failed to check if IP exists on Node") assert.True(t, exists, "Didn't find desired IP on Node") + // Testing the events recorded during creation of an Egress resource. + expectedMessage := fmt.Sprintf("Assigned Egress %s with IP %s on Node %v", egress.Name, tt.expectedEgressIP, egress.Status.EgressNode) + events, err := data.clientset.CoreV1().Events("").Search(scheme.Scheme, egress) + require.NoError(t, err) + assert.Contains(t, events.Items[0].Message, expectedMessage) } checkEIPStatus := func(expectedUsed int) { diff --git a/test/integration/agent/ip_assigner_linux_test.go b/test/integration/agent/ip_assigner_linux_test.go index 77368e18130..8c161f40756 100644 --- a/test/integration/agent/ip_assigner_linux_test.go +++ b/test/integration/agent/ip_assigner_linux_test.go @@ -40,7 +40,7 @@ func TestIPAssigner(t *testing.T) { require.NoError(t, err, "Failed to find the dummy device") defer netlink.LinkDel(dummyDevice) - err = ipAssigner.AssignIP("x", false) + _, err = ipAssigner.AssignIP("x", false) assert.Error(t, err, "Assigning an invalid IP should fail") ip1 := "10.10.10.10" @@ -49,7 +49,7 @@ func TestIPAssigner(t *testing.T) { desiredIPs := sets.New[string](ip1, ip2, ip3) for ip := range desiredIPs { - errAssign := ipAssigner.AssignIP(ip, false) + _, errAssign := ipAssigner.AssignIP(ip, false) cmd := exec.Command("ip", "addr") out, err := cmd.CombinedOutput() if err != nil { @@ -79,7 +79,7 @@ func TestIPAssigner(t *testing.T) { assert.Equal(t, newDesiredIPs, actualIPs, "Actual IPs don't match") for ip := range newDesiredIPs { - err = newIPAssigner.UnassignIP(ip) + _, err = newIPAssigner.UnassignIP(ip) assert.NoError(t, err, "Failed to unassign a valid IP") } assert.Equal(t, sets.New[string](), newIPAssigner.AssignedIPs(), "Assigned IPs don't match")