From 07f6523840c5c3c45972cfc86742112265afe6bd Mon Sep 17 00:00:00 2001 From: Hongliang Liu <75655411+hongliangl@users.noreply.github.com> Date: Tue, 28 May 2024 22:48:33 +0800 Subject: [PATCH] Fix that AntreaProxy could unintentionally delete conntrack entries in zone 0 (#6193) This is a subsequent PR for #5112. As mentioned in #5112: > Due to the restriction of the go library 'netlink', there is no API to specify a target zone. As a result, when deleting a stale conntrack entry with a destination port (such as NodePort), not only will the conntrack entry whose destination port is the port added by AntreaProxy be deleted, but also the conntrack entry that is not added by AntreaProxy will be deleted. This behavior is unexpected, as only the conntrack entries added by AntreaProxy should be deleted. This PR resolves the issue by integrating a CT zone filter, now available in the latest Go library `netlink`. Leveraging this feature, AntreaProxy can accurately delete stale UDP conntrack entries. Signed-off-by: Hongliang Liu --- go.mod | 2 +- pkg/agent/route/route_linux.go | 27 +++++--- pkg/agent/route/route_linux_test.go | 85 ++++++++++++++++++++++-- test/integration/agent/cniserver_test.go | 3 +- 4 files changed, 99 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index d6440796f73..d660c4fe3ba 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 github.com/ti-mo/conntrack v0.5.0 - github.com/vishvananda/netlink v1.2.1-beta.2 + github.com/vishvananda/netlink v1.2.1-beta.2.0.20240523162130-1e68b2710dc3 github.com/vmware/go-ipfix v0.9.0 go.uber.org/mock v0.4.0 golang.org/x/crypto v0.22.0 diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 31890415aa5..4eb8a7fe22b 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -33,6 +33,7 @@ import ( utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util/ipset" @@ -278,7 +279,7 @@ func (c *Client) syncRoute() error { routeKeys := sets.New[routeKey]() for i := range routeList { r := &routeList[i] - if r.Dst == nil { + if r.Dst == nil || r.Dst.IP.IsUnspecified() { continue } routeKeys.Insert(routeKey{ @@ -1918,29 +1919,37 @@ func (c *Client) DeleteRouteForLink(cidr *net.IPNet, linkIndex int) error { func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, endpointIP net.IP, protocol binding.Protocol) error { var protoVar uint8 - var ipFamily netlink.InetFamily + var ipFamilyVar int + var zone uint16 switch protocol { case binding.ProtocolTCP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_TCP + zone = openflow.CtZone case binding.ProtocolTCPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_TCP + zone = openflow.CtZoneV6 case binding.ProtocolUDP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_UDP + zone = openflow.CtZone case binding.ProtocolUDPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_UDP + zone = openflow.CtZoneV6 case binding.ProtocolSCTP: - ipFamily = unix.AF_INET + ipFamilyVar = unix.AF_INET protoVar = unix.IPPROTO_SCTP + zone = openflow.CtZone case binding.ProtocolSCTPv6: - ipFamily = unix.AF_INET6 + ipFamilyVar = unix.AF_INET6 protoVar = unix.IPPROTO_SCTP + zone = openflow.CtZoneV6 } filter := &netlink.ConntrackFilter{} filter.AddProtocol(protoVar) + filter.AddZone(zone) if svcIP != nil { filter.AddIP(netlink.ConntrackOrigDstIP, svcIP) } @@ -1950,7 +1959,7 @@ func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, end if endpointIP != nil { filter.AddIP(netlink.ConntrackReplySrcIP, endpointIP) } - _, err := c.netlink.ConntrackDeleteFilter(netlink.ConntrackTable, ipFamily, filter) + _, err := c.netlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(ipFamilyVar), filter) return err } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index ab266186509..a07db698003 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -23,9 +23,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/vishvananda/netlink" "go.uber.org/mock/gomock" + "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/util/sets" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util/ipset" @@ -33,7 +35,7 @@ import ( "antrea.io/antrea/pkg/agent/util/iptables" iptablestest "antrea.io/antrea/pkg/agent/util/iptables/testing" netlinktest "antrea.io/antrea/pkg/agent/util/netlink/testing" - "antrea.io/antrea/pkg/ovs/openflow" + binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/ip" ) @@ -1322,7 +1324,7 @@ func TestAddNodePort(t *testing.T) { name string nodePortAddresses []net.IP port uint16 - protocol openflow.Protocol + protocol binding.Protocol expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) }{ { @@ -1332,7 +1334,7 @@ func TestAddNodePort(t *testing.T) { net.ParseIP("1.1.2.2"), }, port: 30000, - protocol: openflow.ProtocolTCP, + protocol: binding.ProtocolTCP, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.AddEntry(antreaNodePortIPSet, "1.1.1.1,tcp:30000") ipset.AddEntry(antreaNodePortIPSet, "1.1.2.2,tcp:30000") @@ -1345,7 +1347,7 @@ func TestAddNodePort(t *testing.T) { net.ParseIP("fd00:1234:5678:dead:beaf::2"), }, port: 30001, - protocol: openflow.ProtocolUDPv6, + protocol: binding.ProtocolUDPv6, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.AddEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::1,udp:30001") ipset.AddEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::2,udp:30001") @@ -1368,7 +1370,7 @@ func TestDeleteNodePort(t *testing.T) { name string nodePortAddresses []net.IP port uint16 - protocol openflow.Protocol + protocol binding.Protocol expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) }{ { @@ -1378,7 +1380,7 @@ func TestDeleteNodePort(t *testing.T) { net.ParseIP("1.1.2.2"), }, port: 30000, - protocol: openflow.ProtocolTCP, + protocol: binding.ProtocolTCP, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.DelEntry(antreaNodePortIPSet, "1.1.1.1,tcp:30000") ipset.DelEntry(antreaNodePortIPSet, "1.1.2.2,tcp:30000") @@ -1391,7 +1393,7 @@ func TestDeleteNodePort(t *testing.T) { net.ParseIP("fd00:1234:5678:dead:beaf::2"), }, port: 30001, - protocol: openflow.ProtocolUDPv6, + protocol: binding.ProtocolUDPv6, expectedCalls: func(ipset *ipsettest.MockInterfaceMockRecorder) { ipset.DelEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::1,udp:30001") ipset.DelEntry(antreaNodePortIP6Set, "fd00:1234:5678:dead:beaf::2,udp:30001") @@ -2156,3 +2158,72 @@ COMMIT }) } } + +func TestClearConntrackEntryForService(t *testing.T) { + testCases := []struct { + name string + svcIP net.IP + svcPort uint16 + endpointIP net.IP + protocol binding.Protocol + expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) + }{ + { + name: "TCPv4 with all parameters", + svcIP: net.ParseIP("192.168.1.1"), + svcPort: 80, + endpointIP: net.ParseIP("10.10.0.2"), + protocol: binding.ProtocolTCP, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_TCP) + filter.AddZone(openflow.CtZone) + filter.AddIP(netlink.ConntrackOrigDstIP, net.ParseIP("192.168.1.1")) + filter.AddPort(netlink.ConntrackOrigDstPort, 80) + filter.AddIP(netlink.ConntrackReplySrcIP, net.ParseIP("10.10.0.2")) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET), filter).Times(1) + }, + }, + { + name: "UDPv4 with svcIP and svcPort", + svcIP: net.ParseIP("192.168.1.1"), + svcPort: 53, + endpointIP: nil, + protocol: binding.ProtocolUDP, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_UDP) + filter.AddZone(openflow.CtZone) + filter.AddIP(netlink.ConntrackOrigDstIP, net.ParseIP("192.168.1.1")) + filter.AddPort(netlink.ConntrackOrigDstPort, 53) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET), filter).Times(1) + }, + }, + { + name: "SCTPv6 with endpointIP", + svcIP: nil, + svcPort: 0, + endpointIP: net.ParseIP("fec0::2"), + protocol: binding.ProtocolSCTPv6, + expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(unix.IPPROTO_SCTP) + filter.AddZone(openflow.CtZoneV6) + filter.AddIP(netlink.ConntrackReplySrcIP, net.ParseIP("fec0::2")) + mockNetlink.ConntrackDeleteFilter(netlink.ConntrackTableType(netlink.ConntrackTable), netlink.InetFamily(unix.AF_INET6), filter).Times(1) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockNetlink := netlinktest.NewMockInterface(ctrl) + c := &Client{ + netlink: mockNetlink, + } + tc.expectedCalls(mockNetlink.EXPECT()) + assert.NoError(t, c.ClearConntrackEntryForService(tc.svcIP, tc.svcPort, tc.endpointIP, tc.protocol)) + }) + } +} diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 290eb3cdbeb..5956b6ab343 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -336,7 +336,8 @@ func matchRoute(expectedCIDR string, routes []netlink.Route) (*netlink.Route, er return nil, err } for _, route := range routes { - if route.Dst == nil && route.Src == nil && route.Gw.Equal(gwIP) { + // For default route, `Dst` is 0.0.0.0/0 or ::/0, rather than nil. + if route.Dst != nil && route.Dst.IP.IsUnspecified() && route.Src == nil && route.Gw.Equal(gwIP) { return &route, nil } }