diff --git a/pkg/agent/nodeportlocal/k8s/npl_controller.go b/pkg/agent/nodeportlocal/k8s/npl_controller.go index f0a17ab5d3a..2e2a4eb56aa 100644 --- a/pkg/agent/nodeportlocal/k8s/npl_controller.go +++ b/pkg/agent/nodeportlocal/k8s/npl_controller.go @@ -493,9 +493,13 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { } podPorts[targetPortProto] = struct{}{} portData := c.portTable.GetEntry(podIP, port, protocol) - if portData != nil && !portData.ProtocolInUse(protocol) { - // If the PortTable has an entry for the Pod but does not have an - // entry with protocol, we enforce AddRule for the missing Protocol. + // Special handling for a rule that was previously marked for deletion but could not + // be deleted properly: we have to retry now. + if portData != nil && portData.Defunct() { + klog.InfoS("Deleting defunct rule for Pod to prevent re-use", "pod", klog.KObj(pod), "podIP", podIP, "port", port, "protocol", protocol) + if err := c.portTable.DeleteRule(podIP, port, protocol); err != nil { + return fmt.Errorf("failed to delete defunct rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, port, protocol, err) + } portData = nil } if portData == nil { @@ -527,13 +531,11 @@ func (c *NPLController) handleAddUpdatePod(key string, obj interface{}) error { // second, delete any existing rule that is not needed based on the current Pod // specification. entries := c.portTable.GetDataForPodIP(podIP) - if nplExists { - for _, data := range entries { - proto := data.Protocol - if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { - if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { - return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %v", podIP, data.PodPort, proto.Protocol, err) - } + for _, data := range entries { + proto := data.Protocol + if _, exists := podPorts[util.BuildPortProto(fmt.Sprint(data.PodPort), proto.Protocol)]; !exists { + if err := c.portTable.DeleteRule(podIP, int(data.PodPort), proto.Protocol); err != nil { + return fmt.Errorf("failed to delete rule for Pod IP %s, Pod Port %d, Protocol %s: %w", podIP, data.PodPort, proto.Protocol, err) } } } diff --git a/pkg/agent/nodeportlocal/npl_agent_test.go b/pkg/agent/nodeportlocal/npl_agent_test.go index c9c92fabb7f..bf0cc74a2d9 100644 --- a/pkg/agent/nodeportlocal/npl_agent_test.go +++ b/pkg/agent/nodeportlocal/npl_agent_test.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "sync" + "sync/atomic" "testing" "time" @@ -176,11 +177,12 @@ func getTestSvcWithPortName(portName string) *corev1.Service { type testData struct { *testing.T - stopCh chan struct{} - ctrl *gomock.Controller - k8sClient *k8sfake.Clientset - portTable *portcache.PortTable - wg sync.WaitGroup + stopCh chan struct{} + ctrl *gomock.Controller + k8sClient *k8sfake.Clientset + portTable *portcache.PortTable + svcInformer cache.SharedIndexInformer + wg sync.WaitGroup } func (t *testData) runWrapper(c *k8s.NPLController) { @@ -234,22 +236,18 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { mockPortOpener.EXPECT().OpenLocalPort(gomock.Any(), gomock.Any()).AnyTimes().Return(&fakeSocket{}, nil) } - data := &testData{ - T: t, - stopCh: make(chan struct{}), - ctrl: mockCtrl, - k8sClient: k8sfake.NewSimpleClientset(objects...), - portTable: newPortTable(mockIPTables, mockPortOpener), - } + k8sClient := k8sfake.NewSimpleClientset(objects...) + + portTable := newPortTable(mockIPTables, mockPortOpener) resyncPeriod := 0 * time.Minute // informerFactory is initialized and started from cmd/antrea-agent/agent.go - informerFactory := informers.NewSharedInformerFactory(data.k8sClient, resyncPeriod) + informerFactory := informers.NewSharedInformerFactory(k8sClient, resyncPeriod) listOptions := func(options *metav1.ListOptions) { options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", defaultNodeName).String() } localPodInformer := coreinformers.NewFilteredPodInformer( - data.k8sClient, + k8sClient, metav1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController. @@ -257,7 +255,16 @@ func setUp(t *testing.T, tc *testConfig, objects ...runtime.Object) *testData { ) svcInformer := informerFactory.Core().V1().Services().Informer() - c := k8s.NewNPLController(data.k8sClient, localPodInformer, svcInformer, data.portTable, defaultNodeName) + c := k8s.NewNPLController(k8sClient, localPodInformer, svcInformer, portTable, defaultNodeName) + + data := &testData{ + T: t, + stopCh: make(chan struct{}), + ctrl: mockCtrl, + k8sClient: k8sClient, + portTable: portTable, + svcInformer: svcInformer, + } data.runWrapper(c) informerFactory.Start(data.stopCh) @@ -305,31 +312,41 @@ func (t *testData) tearDown() { os.Unsetenv("NODE_NAME") } -func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) { - var data string - var exists bool +func conditionMatchAll([]types.NPLAnnotation) bool { + return true +} + +// If conditionFn is nil, we will assume you are looking for a non-existing annotation. +// If you want to match all, use conditionMatchAll as the conditionFn. +func (t *testData) pollForPodAnnotationWithCondition(podName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, error) { + var nplValue []types.NPLAnnotation // do not use PollImmediate: 1 second is reserved for the controller to do his job and // update Pod NPL annotations as needed. err := wait.Poll(time.Second, 20*time.Second, func() (bool, error) { updatedPod, err := t.k8sClient.CoreV1().Pods(defaultNS).Get(context.TODO(), podName, metav1.GetOptions{}) require.NoError(t, err, "Failed to get Pod") annotation := updatedPod.GetAnnotations() - data, exists = annotation[types.NPLAnnotationKey] - if found { - return exists, nil + data, exists := annotation[types.NPLAnnotationKey] + if !exists { + return conditionFn == nil, nil } - return !exists, nil + if conditionFn == nil { + return false, nil + } + if err := json.Unmarshal([]byte(data), &nplValue); err != nil { + return false, err + } + return conditionFn(nplValue), nil }) + return nplValue, err +} - if err != nil { - return []types.NPLAnnotation{}, err - } - if data == "" { - return []types.NPLAnnotation{}, nil +func (t *testData) pollForPodAnnotation(podName string, found bool) ([]types.NPLAnnotation, error) { + var conditionFn func([]types.NPLAnnotation) bool + if found { + conditionFn = conditionMatchAll } - var nplValue []types.NPLAnnotation - err = json.Unmarshal([]byte(data), &nplValue) - return nplValue, err + return t.pollForPodAnnotationWithCondition(podName, conditionFn) } func (t *testData) updateServiceOrFail(testSvc *corev1.Service) { @@ -497,6 +514,7 @@ func TestPodDelete(t *testing.T) { // TestPodAddMultiPort creates a Pod and a Service with two target ports. // It verifies that the Pod's NPL annotation and the local port table are updated with both ports. +// It then updates the Service to remove one of the target ports. func TestAddMultiPortPodSvc(t *testing.T) { newPort := 90 testSvc := getTestSvc(defaultPort, int32(newPort)) @@ -510,6 +528,16 @@ func TestAddMultiPortPodSvc(t *testing.T) { expectedAnnotations.Check(t, value) assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + // Wait for annotation to be updated (single mapping). + value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 }) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) } // TestPodAddMultiPort creates a Pod with multiple ports and a Service with only one target port. @@ -807,3 +835,106 @@ func TestSyncRulesError(t *testing.T) { testData, _, _ := setUpWithTestServiceAndPod(t, testConfig, nil) defer testData.tearDown() } + +func TestSingleRuleDeletionError(t *testing.T) { + newPort := 90 + testSvc := getTestSvc(defaultPort, int32(newPort)) + testPod := getTestPod() + + testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) { + mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + gomock.InOrder( + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("iptables failure")), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()), + ) + }) + + testData := setUp(t, testConfig, testSvc, testPod) + defer testData.tearDown() + + value, err := testData.pollForPodAnnotation(testPod.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port, to force one mapping to be deleted. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + // The first deletion attempt will fail, but the second should succeed. + // Wait for annotation to be updated (single mapping). + value, err = testData.pollForPodAnnotationWithCondition(testPod.Name, func(value []types.NPLAnnotation) bool { return len(value) == 1 }) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations = newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.False(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) +} + +func TestPreventDefunctRuleReuse(t *testing.T) { + newPort := 90 + testSvc := getTestSvc(defaultPort, int32(newPort)) + testPod := getTestPod() + + var testData *testData + + ports := testSvc.Spec.Ports + // This function will be executed synchronously when DeleteRule is called for the first time + // and we simulate a failure. It restores the second target port for the Service, which was + // deleted previously, and waits for the change to be reflected in the informer's + // store. After that, we know that the next time the NPL controller processes the test Pod, + // it will need to ensure that both NPL mappings are configured correctly. Because one of + // the rules will be marked as "defunct", it will first need to delete the rule properly + // before adding it back. + restoreServiceTargetPorts := func() { + testSvc.Spec.Ports = ports + _, err := testData.k8sClient.CoreV1().Services(defaultNS).Update(context.TODO(), testSvc, metav1.UpdateOptions{}) + if !assert.NoError(t, err) { + return + } + assert.EventuallyWithT(t, func(c *assert.CollectT) { + obj, exists, err := testData.svcInformer.GetIndexer().GetByKey(testSvc.Namespace + "/" + testSvc.Name) + if !assert.NoError(t, err) || !assert.True(t, exists) { + return + } + svc := obj.(*corev1.Service) + assert.Len(t, svc.Spec.Ports, 2) + }, 2*time.Second, 50*time.Millisecond) + } + + var done atomic.Bool + + testConfig := newTestConfig().withCustomPodPortRulesExpectations(func(mockIPTables *rulestesting.MockPodPortRules) { + mockIPTables.EXPECT().AddAllRules(gomock.Any()).AnyTimes() + gomock.InOrder( + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(nodePort int, podIP string, podPort int, protocol string) { restoreServiceTargetPorts() }, + ).Return(fmt.Errorf("iptables failure")), + mockIPTables.EXPECT().DeleteRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()), + mockIPTables.EXPECT().AddRule(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(nodePort int, podIP string, podPort int, protocol string) { done.Store(true) }, + ), + ) + }) + + testData = setUp(t, testConfig, testSvc, testPod) + defer testData.tearDown() + + value, err := testData.pollForPodAnnotation(testPod.Name, true) + require.NoError(t, err, "Poll for annotation check failed") + expectedAnnotations := newExpectedNPLAnnotations().Add(nil, defaultPort, protocolTCP).Add(nil, newPort, protocolTCP) + expectedAnnotations.Check(t, value) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP)) + assert.True(t, testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP)) + + // Remove the second target port, to force one mapping to be deleted. + testSvc.Spec.Ports = testSvc.Spec.Ports[:1] + testData.updateServiceOrFail(testSvc) + + assert.Eventually(t, done.Load, 2*time.Second, 50*time.Millisecond) + assert.Eventually(t, func() bool { + return testData.portTable.RuleExists(defaultPodIP, defaultPort, protocolTCP) && testData.portTable.RuleExists(defaultPodIP, newPort, protocolTCP) + }, 2*time.Second, 50*time.Millisecond) +} diff --git a/pkg/agent/nodeportlocal/portcache/port_table.go b/pkg/agent/nodeportlocal/portcache/port_table.go index c2c99203eb1..b1856861bd9 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table.go +++ b/pkg/agent/nodeportlocal/portcache/port_table.go @@ -32,13 +32,8 @@ const ( PodIPIndex = "podIPIndex" ) -// protocolSocketState represents the state of the socket corresponding to a -// given (Node port, protocol) tuple. -type protocolSocketState int - type ProtocolSocketData struct { Protocol string - State protocolSocketState socket io.Closer } @@ -47,14 +42,13 @@ type NodePortData struct { PodPort int PodIP string Protocol ProtocolSocketData + // defunct is used to indicate that a rule has been partially deleted: it is no longer + // usable and deletion needs to be re-attempted. + defunct bool } -func (d *NodePortData) ProtocolInUse(protocol string) bool { - protocolSocketData := d.Protocol - if protocolSocketData.Protocol == protocol { - return protocolSocketData.State == stateInUse - } - return false +func (d *NodePortData) Defunct() bool { + return d.defunct } type LocalPortOpener interface { @@ -204,8 +198,8 @@ func podIPPortProtoFormat(ip string, port int, protocol string) string { } func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol string) *NodePortData { - data, exists := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) - if exists == false { + data, ok := pt.getPortTableCacheFromPodEndpointIndex(podIPPortProtoFormat(ip, port, protocol)) + if !ok { return nil } return data @@ -214,10 +208,8 @@ func (pt *PortTable) getEntryByPodIPPortProto(ip string, port int, protocol stri func (pt *PortTable) RuleExists(podIP string, podPort int, protocol string) bool { pt.tableLock.RLock() defer pt.tableLock.RUnlock() - if data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol); data != nil { - return data.ProtocolInUse(protocol) - } - return false + data := pt.getEntryByPodIPPortProto(podIP, podPort, protocol) + return data != nil } // nodePortProtoFormat formats the nodeport, protocol to string port:protocol. diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others.go b/pkg/agent/nodeportlocal/portcache/port_table_others.go index 5e9d7b89035..ab73071073b 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others.go @@ -26,18 +26,6 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) -const ( - // stateOpen means that a listening socket has been opened for the - // protocol (as a means to reserve the port for this protocol), but no - // NPL rule has been installed for it. - stateOpen protocolSocketState = iota - // stateInUse means that a listening socket has been opened AND a NPL - // rule has been installed. - stateInUse - // stateClosed means that the socket has been closed. - stateClosed -) - func openSocketsForPort(localPortOpener LocalPortOpener, port int, protocol string) (ProtocolSocketData, error) { // Port only needs to be available for the protocol used by the NPL rule. // We don't need to allocate the same nodePort for all protocols anymore. @@ -48,7 +36,6 @@ func openSocketsForPort(localPortOpener LocalPortOpener, port int, protocol stri } protocolData := ProtocolSocketData{ Protocol: protocol, - State: stateInUse, socket: socket, } return protocolData, nil @@ -83,28 +70,6 @@ func (pt *PortTable) getFreePort(podIP string, podPort int, protocol string) (in return 0, ProtocolSocketData{}, fmt.Errorf("no free port found") } -func (d *NodePortData) CloseSockets() error { - if d.Protocol.Protocol != "" { - protocolSocketData := &d.Protocol - switch protocolSocketData.State { - case stateClosed: - // already closed - return nil - case stateInUse: - // should not happen - return fmt.Errorf("protocol %s is still in use, cannot release socket", protocolSocketData.Protocol) - case stateOpen: - if err := protocolSocketData.socket.Close(); err != nil { - return fmt.Errorf("error when releasing local port %d with protocol %s: %v", d.NodePort, protocolSocketData.Protocol, err) - } - protocolSocketData.State = stateClosed - default: - return fmt.Errorf("invalid protocol socket state") - } - } - return nil -} - func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, error) { pt.tableLock.Lock() defer pt.tableLock.Unlock() @@ -128,11 +93,38 @@ func (pt *PortTable) AddRule(podIP string, podPort int, protocol string) (int, e pt.addPortTableCache(npData) } else { // Only add rules if the entry does not exist. - return 0, fmt.Errorf("existed Linux Nodeport entry for %s:%d:%s", podIP, podPort, protocol) + return 0, fmt.Errorf("existing Linux Nodeport entry for %s:%d:%s", podIP, podPort, protocol) } return npData.NodePort, nil } +func (pt *PortTable) deleteRule(data *NodePortData) error { + protocolSocketData := &data.Protocol + protocol := protocolSocketData.Protocol + + // In theory, we should not be modifying a cache item in-place. However, the field we are + // modifying (defunct) does NOT participate in indexing and the modification is thread-safe + // because of pt.tableLock. + // TODO: stop modifying cache items in-place. + // We could set defunct after the call to DeleteRule, because a failed call to DeleteRule + // should mean that the rule is still present and valid, but there is no harm in being more + // conservative. + data.defunct = true + + // Calling DeleteRule is idempotent. + if err := pt.PodPortRules.DeleteRule(data.NodePort, data.PodIP, data.PodPort, protocol); err != nil { + return err + } + if err := protocolSocketData.socket.Close(); err != nil { + return fmt.Errorf("error when releasing local port %d with protocol %s: %w", data.NodePort, protocol, err) + } + // We don't need to delete cache from different indexes repeatedly because they map to the same entry. + // Deletion errors are not possible because our Index functions cannot return errors. + // See https://github.com/kubernetes/client-go/blob/3aa45779f2e5592d52edf68da66abfbd0805e413/tools/cache/store.go#L189-L196 + pt.deletePortTableCache(data) + return nil +} + func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) error { pt.tableLock.Lock() defer pt.tableLock.Unlock() @@ -141,15 +133,7 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro // Delete not required when the PortTable entry does not exist return nil } - if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { - return err - } - if err := data.CloseSockets(); err != nil { - return err - } - // We don't need to delete cache from different indexes repeatedly because they map to the same entry. - pt.deletePortTableCache(data) - return nil + return pt.deleteRule(data) } func (pt *PortTable) DeleteRulesForPod(podIP string) error { @@ -157,14 +141,7 @@ func (pt *PortTable) DeleteRulesForPod(podIP string) error { defer pt.tableLock.Unlock() podEntries := pt.getDataForPodIP(podIP) for _, podEntry := range podEntries { - protocolSocketData := podEntry.Protocol - if err := pt.PodPortRules.DeleteRule(podEntry.NodePort, podIP, podEntry.PodPort, protocolSocketData.Protocol); err != nil { - return err - } - if err := protocolSocketData.socket.Close(); err != nil { - return fmt.Errorf("error when releasing local port %d with protocol %s: %v", podEntry.NodePort, protocolSocketData.Protocol, err) - } - pt.deletePortTableCache(podEntry) + return pt.deleteRule(podEntry) } return nil } @@ -177,17 +154,13 @@ func (pt *PortTable) syncRules() error { nplPorts := make([]rules.PodNodePort, 0, len(objs)) for _, obj := range objs { npData := obj.(*NodePortData) - protocols := make([]string, 0, 1) - protocol := npData.Protocol - if protocol.State == stateInUse { - protocols = append(protocols, protocol.Protocol) - } + protocol := npData.Protocol.Protocol nplPorts = append(nplPorts, rules.PodNodePort{ NodePort: npData.NodePort, PodPort: npData.PodPort, PodIP: npData.PodIP, - Protocol: protocols[0], - Protocols: protocols, + Protocol: protocol, + Protocols: []string{protocol}, }) } if err := pt.PodPortRules.AddAllRules(nplPorts); err != nil { diff --git a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go index 95bc07c6f35..af2073cf0b0 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_others_test.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_others_test.go @@ -18,10 +18,13 @@ package portcache import ( + "fmt" "testing" "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" portcachetesting "antrea.io/antrea/pkg/agent/nodeportlocal/portcache/testing" "antrea.io/antrea/pkg/agent/nodeportlocal/rules" @@ -76,3 +79,56 @@ func TestRestoreRules(t *testing.T) { t.Fatalf("Rule restoration not complete after %v", timeout) } } + +type mockCloser struct { + closeErr error +} + +func (m *mockCloser) Close() error { + return m.closeErr +} + +func TestDeleteRule(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockIPTables := rulestesting.NewMockPodPortRules(mockCtrl) + mockPortOpener := portcachetesting.NewMockLocalPortOpener(mockCtrl) + portTable := newPortTable(mockIPTables, mockPortOpener) + + const ( + podPort = 1001 + protocol = "tcp" + ) + + closer := &mockCloser{} + + data := &NodePortData{ + NodePort: nodePort1, + PodPort: podPort, + PodIP: podIP, + Protocol: ProtocolSocketData{ + Protocol: protocol, + socket: closer, + }, + } + + require.NoError(t, portTable.addPortTableCache(data)) + assert.False(t, data.Defunct()) + + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol).Return(fmt.Errorf("iptables error")) + require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "iptables error") + + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) + closer.closeErr = fmt.Errorf("close error") + require.ErrorContains(t, portTable.DeleteRule(podIP, podPort, protocol), "close error") + assert.True(t, data.Defunct()) + + closer.closeErr = nil + + // First successful call to DeleteRule. + mockIPTables.EXPECT().DeleteRule(nodePort1, podIP, podPort, protocol) + assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) + + // Calling DeleteRule again will return immediately as the NodePortData entry has been + // removed from the cache. + assert.NoError(t, portTable.DeleteRule(podIP, podPort, protocol)) +} diff --git a/pkg/agent/nodeportlocal/portcache/port_table_windows.go b/pkg/agent/nodeportlocal/portcache/port_table_windows.go index aff2a32247e..e084bde1838 100644 --- a/pkg/agent/nodeportlocal/portcache/port_table_windows.go +++ b/pkg/agent/nodeportlocal/portcache/port_table_windows.go @@ -25,11 +25,6 @@ import ( "antrea.io/antrea/pkg/agent/nodeportlocal/rules" ) -const ( - // stateInUse means that the NPL rule has been installed. - stateInUse protocolSocketState = 1 -) - func addRuleForPort(podPortRules rules.PodPortRules, port int, podIP string, podPort int, protocol string) (ProtocolSocketData, error) { // Only the protocol used here should be returned if NetNatStaticMapping rule // can be inserted to an unused protocol port. @@ -40,7 +35,6 @@ func addRuleForPort(podPortRules rules.PodPortRules, port int, podIP string, pod } protocolData := ProtocolSocketData{ Protocol: protocol, - State: stateInUse, socket: nil, } return protocolData, nil @@ -137,6 +131,8 @@ func (pt *PortTable) DeleteRule(podIP string, podPort int, protocol string) erro return nil } + data.defunct = true + // Calling DeleteRule is idempotent. if err := pt.PodPortRules.DeleteRule(data.NodePort, podIP, podPort, protocol); err != nil { return err } diff --git a/test/e2e/nodeportlocal_test.go b/test/e2e/nodeportlocal_test.go index cd68300cc9a..ecf5735d407 100644 --- a/test/e2e/nodeportlocal_test.go +++ b/test/e2e/nodeportlocal_test.go @@ -88,7 +88,7 @@ func TestNodePortLocal(t *testing.T) { t.Run("testNPLChangePortRangeAgentRestart", func(t *testing.T) { testNPLChangePortRangeAgentRestart(t, data) }) } -func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, testPodName string, conditionFn func(types.NPLAnnotation) bool) ([]types.NPLAnnotation, string) { +func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, testPodName string, conditionFn func([]types.NPLAnnotation) bool) ([]types.NPLAnnotation, string) { var nplAnnotations []types.NPLAnnotation var testPodIP *PodIPs @@ -128,12 +128,8 @@ func getNPLAnnotations(t *testing.T, data *TestData, r *require.Assertions, test return false, nil } json.Unmarshal([]byte(nplAnn), &nplAnnotations) - if conditionFn != nil { - for _, annotation := range nplAnnotations { - if !conditionFn(annotation) { - return false, nil - } - } + if conditionFn != nil && !conditionFn(nplAnnotations) { + return false, nil } return found, nil }) @@ -494,13 +490,11 @@ func NPLTestPodAddMultiProtocol(t *testing.T, data *TestData) { Add(nil, 8080, "tcp").Add(nil, 8080, "udp") // Creating a Pod using agnhost image to support multiple protocols, instead of nginx. - cmd := []string{"/bin/bash", "-c"} - args := []string{ - fmt.Sprintf("/agnhost serve-hostname --udp --http=false --port %d & /agnhost serve-hostname --tcp --http=false --port %d", 8080, 8080), - } + + args := []string{"serve-hostname", "--tcp", "--udp", "--http=false", "--port=8080"} port := corev1.ContainerPort{ContainerPort: 8080} containerName := fmt.Sprintf("c%v", 8080) - err := NewPodBuilder(testPodName, data.testNamespace, agnhostImage).OnNode(serverNode).WithContainerName(containerName).WithCommand(cmd).WithArgs(args).WithPorts([]corev1.ContainerPort{port}).WithLabels(selector).Create(testData) + err := NewPodBuilder(testPodName, data.testNamespace, agnhostImage).OnNode(serverNode).WithContainerName(containerName).WithArgs(args).WithPorts([]corev1.ContainerPort{port}).WithLabels(selector).Create(testData) r.NoError(err, "Error creating test Pod: %v", err) nplAnnotations, testPodIP := getNPLAnnotations(t, testData, r, testPodName, nil) @@ -516,14 +510,26 @@ func NPLTestPodAddMultiProtocol(t *testing.T, data *TestData) { r.NoError(err, "Error when getting Antrea Agent Pod on Node '%s'", serverNode) checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, true) + expectedAnnotations.Check(t, nplAnnotations) + checkTrafficForNPL(testData, r, nplAnnotations, clientName) + + // We now delete one of the Services, and we expect the corresponding NPL rule to be deleted. + testData.DeleteService(data.testNamespace, "agnhost2") + expectedAnnotations = newExpectedNPLAnnotations(defaultStartPort, defaultEndPort). + Add(nil, 8080, "tcp") + // Wait until we have only one NPL rule annotation. + conditionFn := func(annotations []types.NPLAnnotation) bool { + return len(annotations) == 1 + } + nplAnnotations, testPodIP = getNPLAnnotations(t, testData, r, testPodName, conditionFn) + checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, true) expectedAnnotations.Check(t, nplAnnotations) checkTrafficForNPL(testData, r, nplAnnotations, clientName) testData.DeletePod(data.testNamespace, clientName) testData.DeletePod(data.testNamespace, testPodName) testData.DeleteService(data.testNamespace, "agnhost1") - testData.DeleteService(data.testNamespace, "agnhost2") checkNPLRules(t, testData, r, nplAnnotations, antreaPod, testPodIP, serverNode, false) } @@ -698,8 +704,11 @@ func testNPLChangePortRangeAgentRestart(t *testing.T, data *TestData) { } for _, testPodName := range testPods { - conditionFn := func(ann types.NPLAnnotation) bool { - return ann.NodePort >= updatedStartPort + conditionFn := func(annotations []types.NPLAnnotation) bool { + for idx := range annotations { + return annotations[idx].NodePort >= updatedStartPort + } + return true } nplAnnotations, testPodIP := getNPLAnnotations(t, data, r, testPodName, conditionFn)