diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 7c663f57e89..a912db846a6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -818,7 +818,9 @@ func run(o *Options) error { validator, networkConfig.TrafficEncapMode.SupportsEncap(), nodeInformer, - enableBridgingMode) + enableBridgingMode, + v4Enabled, + v6Enabled) if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 9de45eff092..bdaa3afd2d9 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -15,6 +15,7 @@ package multicast import ( + "fmt" "net" "sync" "time" @@ -265,6 +266,13 @@ type Controller struct { installedNodes sets.Set[string] encapEnabled bool flexibleIPAMEnabled bool + // ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false + // in Initialize as Multicast does not support IPv6 for now. + // TODO: remove this flag after IPv6 is supported in Multicast. + ipv4Enabled bool + // ipv6Enabled is the flag that if it is running on IPv6 cluster. + // TODO: remove this flag after IPv6 is supported in Multicast. + ipv6Enabled bool } func NewMulticastController(ofClient openflow.Client, @@ -279,7 +287,9 @@ func NewMulticastController(ofClient openflow.Client, validator types.McastNetworkPolicyController, isEncap bool, nodeInformer coreinformers.NodeInformer, - enableFlexibleIPAM bool) *Controller { + enableFlexibleIPAM bool, + ipv4Enabled bool, + ipv6Enabled bool) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ @@ -303,6 +313,8 @@ func NewMulticastController(ofClient openflow.Client, queryGroupId: v4GroupAllocator.Allocate(), encapEnabled: isEncap, flexibleIPAMEnabled: enableFlexibleIPAM, + ipv4Enabled: ipv4Enabled, + ipv6Enabled: ipv6Enabled, } if isEncap { c.nodeGroupID = v4GroupAllocator.Allocate() @@ -331,6 +343,11 @@ func NewMulticastController(ofClient openflow.Client, } func (c *Controller) Initialize() error { + if !c.ipv4Enabled { + return fmt.Errorf("multicast is not supported on the IPv6-only cluster") + } else if c.ipv6Enabled { + klog.InfoS("Multicast only works with IPv4 traffic on a dual-stack cluster") + } err := c.mRouteClient.Initialize() if err != nil { return err diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 2956e6d1ea7..8ffe39588f3 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -1255,7 +1255,7 @@ func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM b clientset = fake.NewSimpleClientset() informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour) nodeInformer := informerFactory.Core().V1().Nodes() - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM, true, false) return mctrl } @@ -1265,6 +1265,36 @@ func TestFlexibleIPAMModeInitialize(t *testing.T) { assert.NoError(t, err) } +func TestMulticastControllerOnIPv6Cluster(t *testing.T) { + for _, tc := range []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + expErr error + }{ + { + name: "Failed on IPv6 only cluster", + ipv4Enabled: false, + ipv6Enabled: true, + expErr: fmt.Errorf("multicast is not supported on the IPv6-only cluster"), + }, + { + name: "Succeeded on dual-stack cluster", + ipv4Enabled: true, + ipv6Enabled: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + mockController := newMockMulticastController(t, true, false) + mockController.ipv4Enabled = tc.ipv4Enabled + mockController.ipv6Enabled = tc.ipv6Enabled + err := mockController.Initialize() + assert.Equal(t, err, tc.expErr) + }) + + } +} + func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index f18d5e4d424..f7c18f94b59 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -128,8 +128,6 @@ type Client struct { nodePortsIPv6 sync.Map // clusterNodeIPs stores the IPv4 of all other Nodes in the cluster clusterNodeIPs sync.Map - // clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster - clusterNodeIP6s sync.Map // egressRoutes caches ip routes about Egresses. egressRoutes sync.Map // The latest calculated Service CIDRs can be got from serviceCIDRProvider. @@ -418,26 +416,18 @@ func (c *Client) syncIPSet() error { } if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { - if err := c.ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil { - return err - } - if err := c.ipset.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true); err != nil { - return err - } - c.clusterNodeIPs.Range(func(_, v interface{}) bool { - ipsetEntry := v.(string) - if err := c.ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil { - return false - } - return true - }) - c.clusterNodeIP6s.Range(func(_, v interface{}) bool { - ipSetEntry := v.(string) - if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { - return false + if c.networkConfig.IPv4Enabled { + if err := c.ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil { + return err } - return true - }) + c.clusterNodeIPs.Range(func(_, v interface{}) bool { + ipsetEntry := v.(string) + if err := c.ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil { + return false + } + return true + }) + } } if c.nodeNetworkPolicyEnabled { @@ -709,7 +699,7 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, }...) } - if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() { + if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() { // Drop the multicast packets forwarded from other Nodes in the cluster. This is because // the packet sent out from the sender Pod is already received via tunnel port with encap mode, // and the one forwarded via the underlay network is to send to external receivers @@ -832,7 +822,7 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet, writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain)) // The masqueraded multicast traffic will become unicast so we // stop traversing this antreaPostRoutingChain for multicast traffic. - if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() { + if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() { writeLine(iptablesData, []string{ "-A", antreaPostRoutingChain, "-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`, @@ -1837,17 +1827,12 @@ func (c *Client) addNodeIP(podCIDR *net.IPNet, nodeIP net.IP) error { if nodeIP == nil { return nil } - ipSetEntry := nodeIP.String() if nodeIP.To4() != nil { + ipSetEntry := nodeIP.String() if err := c.ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil { return err } c.clusterNodeIPs.Store(podCIDR.String(), ipSetEntry) - } else { - if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil { - return err - } - c.clusterNodeIP6s.Store(podCIDR.String(), ipSetEntry) } return nil } @@ -1870,16 +1855,6 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error { return err } c.clusterNodeIPs.Delete(podCIDRStr) - } else { - obj, exists := c.clusterNodeIP6s.Load(podCIDRStr) - if !exists { - return nil - } - ipSetEntry := obj.(string) - if err := c.ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil { - return err - } - c.clusterNodeIP6s.Delete(podCIDRStr) } return nil } diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index 417bc1613a1..2d8b3ac5c15 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -158,7 +158,6 @@ func TestSyncIPSet(t *testing.T) { nodePortsIPv4 []string nodePortsIPv6 []string clusterNodeIPs map[string]string - clusterNodeIP6s map[string]string nodeNetworkPolicyIPSetsIPv4 map[string]sets.Set[string] nodeNetworkPolicyIPSetsIPv6 map[string]sets.Set[string] expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder) @@ -205,7 +204,6 @@ func TestSyncIPSet(t *testing.T) { nodePortsIPv4: []string{"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"}, nodePortsIPv6: []string{"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"}, clusterNodeIPs: map[string]string{"172.16.3.0/24": "192.168.0.3", "172.16.4.0/24": "192.168.0.4"}, - clusterNodeIP6s: map[string]string{"2001:ab03:cd04:5503::/64": "fe80::e643:4bff:fe03", "2001:ab03:cd04:5504::/64": "fe80::e643:4bff:fe04"}, nodeNetworkPolicyIPSetsIPv4: map[string]sets.Set[string]{"ANTREA-POL-RULE1-4": sets.New[string]("1.1.1.1/32", "2.2.2.2/32")}, nodeNetworkPolicyIPSetsIPv6: map[string]sets.Set[string]{"ANTREA-POL-RULE1-6": sets.New[string]("fec0::1111/128", "fec0::2222/128")}, expectedCalls: func(mockIPSet *ipsettest.MockInterfaceMockRecorder) { @@ -220,11 +218,8 @@ func TestSyncIPSet(t *testing.T) { mockIPSet.AddEntry(antreaNodePortIP6Set, "fe80::e643:4bff:fe44:ee,tcp:10000") mockIPSet.AddEntry(antreaNodePortIP6Set, "::1,tcp:10000") mockIPSet.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false) - mockIPSet.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true) mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.3") mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.4") - mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe03") - mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe04") mockIPSet.CreateIPSet("ANTREA-POL-RULE1-4", ipset.HashNet, false) mockIPSet.CreateIPSet("ANTREA-POL-RULE1-6", ipset.HashNet, true) mockIPSet.AddEntry("ANTREA-POL-RULE1-4", "1.1.1.1/32") @@ -269,7 +264,6 @@ func TestSyncIPSet(t *testing.T) { nodePortsIPv4: sync.Map{}, nodePortsIPv6: sync.Map{}, clusterNodeIPs: sync.Map{}, - clusterNodeIP6s: sync.Map{}, } for _, nodePortIPv4 := range tt.nodePortsIPv4 { c.nodePortsIPv4.Store(nodePortIPv4, struct{}{}) @@ -280,9 +274,6 @@ func TestSyncIPSet(t *testing.T) { for cidr, nodeIP := range tt.clusterNodeIPs { c.clusterNodeIPs.Store(cidr, nodeIP) } - for cidr, nodeIP := range tt.clusterNodeIP6s { - c.clusterNodeIP6s.Store(cidr, nodeIP) - } for set, ips := range tt.nodeNetworkPolicyIPSetsIPv4 { c.nodeNetworkPolicyIPSetsIPv4.Store(set, ips) } @@ -402,7 +393,6 @@ COMMIT :ANTREA-OUTPUT - [0:0] -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK -A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK --A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP COMMIT *mangle :ANTREA-MANGLE - [0:0] @@ -1790,10 +1780,6 @@ func TestAddAndDeleteNodeIP(t *testing.T) { networkConfig: &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap}, podCIDR: ip.MustParseCIDR("1122:3344::/80"), nodeIP: net.ParseIP("aabb:ccdd::1"), - expectedCalls: func(mockIPSet *ipsettest.MockInterfaceMockRecorder) { - mockIPSet.AddEntry(clusterNodeIP6Set, "aabb:ccdd::1") - mockIPSet.DelEntry(clusterNodeIP6Set, "aabb:ccdd::1") - }, }, } for _, tt := range tests { @@ -1805,25 +1791,23 @@ func TestAddAndDeleteNodeIP(t *testing.T) { networkConfig: tt.networkConfig, multicastEnabled: tt.multicastEnabled, } - tt.expectedCalls(mockIPSet.EXPECT()) + if tt.expectedCalls != nil { + tt.expectedCalls(mockIPSet.EXPECT()) + } ipv6 := tt.nodeIP.To4() == nil assert.NoError(t, c.addNodeIP(tt.podCIDR, tt.nodeIP)) var exists bool - if ipv6 { - _, exists = c.clusterNodeIP6s.Load(tt.podCIDR.String()) - } else { + if !ipv6 { _, exists = c.clusterNodeIPs.Load(tt.podCIDR.String()) + assert.True(t, exists) } - assert.True(t, exists) assert.NoError(t, c.deleteNodeIP(tt.podCIDR)) - if ipv6 { - _, exists = c.clusterNodeIP6s.Load(tt.podCIDR.String()) - } else { + if !ipv6 { _, exists = c.clusterNodeIPs.Load(tt.podCIDR.String()) + assert.False(t, exists) } - assert.False(t, exists) }) } }