From d238ecdd5d37a46a58fab3ebdff5b2648835895c Mon Sep 17 00:00:00 2001 From: ceclinux Date: Thu, 21 Dec 2023 13:44:40 +0800 Subject: [PATCH] [Multicast] Add Support for FlexibleIPAM (#4922) When FlexibleIPAM is enabled for the cluster. Inbound multicast traffic will be forwarded to MulticastRoutingTable. Outbound multicast traffic will forward to local ports, the uplink interface, and the host interface. Signed-off-by: ceclinux --- ci/jenkins/test.sh | 2 +- cmd/antrea-agent/agent.go | 3 +- docs/external-node.md | 2 +- docs/multicast-guide.md | 94 +++++++++------ pkg/agent/multicast/mcast_controller.go | 24 +++- pkg/agent/multicast/mcast_controller_test.go | 47 +++++--- pkg/agent/multicast/mcast_route.go | 4 +- pkg/agent/multicast/mcast_route_linux.go | 7 ++ pkg/agent/multicast/mcast_route_test.go | 2 +- pkg/agent/openflow/client.go | 21 +++- pkg/agent/openflow/client_test.go | 22 ++++ pkg/agent/openflow/multicast.go | 55 ++++++--- pkg/agent/openflow/multicast_test.go | 4 +- pkg/agent/openflow/pipeline.go | 24 ++-- pkg/agent/openflow/testing/mock_openflow.go | 14 +++ test/e2e/antreaipam_anp_test.go | 1 + test/e2e/antreaipam_test.go | 6 + test/e2e/antreapolicy_test.go | 50 ++++---- test/e2e/multicast_test.go | 119 ++++++++++--------- 19 files changed, 326 insertions(+), 175 deletions(-) diff --git a/ci/jenkins/test.sh b/ci/jenkins/test.sh index 84606cb0767..43ab1b9100e 100755 --- a/ci/jenkins/test.sh +++ b/ci/jenkins/test.sh @@ -1145,7 +1145,7 @@ EOF export KUBECONFIG=${KUBECONFIG_PATH} if [[ $TESTBED_TYPE == "flexible-ipam" ]]; then - ./hack/generate-manifest.sh --flexible-ipam --verbose-log > build/yamls/antrea.yml + ./hack/generate-manifest.sh --flexible-ipam --multicast --verbose-log > build/yamls/antrea.yml fi if [[ $TESTCASE =~ "multicast" ]]; then diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 4de729b0c59..44604492295 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -782,7 +782,8 @@ func run(o *Options) error { o.igmpQueryVersions, validator, networkConfig.TrafficEncapMode.SupportsEncap(), - nodeInformer) + nodeInformer, + enableBridgingMode) if err := mcastController.Initialize(); err != nil { return err } diff --git a/docs/external-node.md b/docs/external-node.md index 93c66010ac7..117682e6f0b 100644 --- a/docs/external-node.md +++ b/docs/external-node.md @@ -643,7 +643,7 @@ OVS bridge, and both interfaces are configured with the same MAC address, the match condition of an OpenFlow entry in `L2ForwardingCalcTable` uses the input port number but not the MAC address of the packet. The flow actions are: -1) set flag `OFPortFoundRegMark`, and +1) set flag `OutputToOFPortRegMark`, and 2) set the peer port as the `TargetOFPortField`, and 3) enforce the packet to go to stageIngressSecurity. diff --git a/docs/multicast-guide.md b/docs/multicast-guide.md index 6759d26d8c3..2e53ba6de06 100644 --- a/docs/multicast-guide.md +++ b/docs/multicast-guide.md @@ -2,11 +2,13 @@ Antrea supports multicast traffic in the following scenarios: -1. Pod to Pod - a Pod that has joined a multicast group will receive the multicast - traffic to that group from the Pod senders. -2. Pod to External - external hosts can receive the multicast traffic sent from Pods, - when the Node network supports multicast forwarding / routing to the external hosts. -3. External to Pod - Pods can receive the multicast traffic from external hosts. +1. Pod to Pod - a Pod that has joined a multicast group will receive the + multicast traffic to that group from the Pod senders. +2. Pod to External - external hosts can receive the multicast traffic sent + from Pods, when the Node network supports multicast forwarding / routing to + the external hosts. +3. External to Pod - Pods can receive the multicast traffic from external + hosts. ## Table of Contents @@ -23,20 +25,22 @@ Antrea supports multicast traffic in the following scenarios: - [Maximum number of receiver groups on one Node](#maximum-number-of-receiver-groups-on-one-node) - [Traffic in local network control block](#traffic-in-local-network-control-block) - [Linux kernel](#linux-kernel) + - [Antrea FlexibleIPAM](#antrea-flexibleipam) ## Prerequisites -Multicast support was introduced in Antrea v1.5.0 as an alpha feature, and was graduated to -beta in v1.12.0. +Multicast support was introduced in Antrea v1.5.0 as an alpha feature, and was +graduated to beta in v1.12.0. -* Prior to v1.12.0, a feature gate, `Multicast` must be enabled in the `antrea-controller` - and `antrea-agent` configuration to use the feature. -* Starting from v1.12.0, the feature gate is enabled by default, you need to set the - `multicast.enable` flag to true in the `antrea-agent` configuration to use the feature. +* Prior to v1.12.0, a feature gate, `Multicast` must be enabled in the + `antrea-controller` and `antrea-agent` configuration to use the feature. +* Starting from v1.12.0, the feature gate is enabled by default, you need to set + the `multicast.enable` flag to true in the `antrea-agent` configuration to use + the feature. -There are three other configuration options -`multicastInterfaces`, `igmpQueryVersions`, -and `igmpQueryInterval` for `antrea-agent`. +There are three other configuration options -`multicastInterfaces`, +`igmpQueryVersions`, and `igmpQueryInterval` for `antrea-agent`. ```yaml antrea-agent.conf: | @@ -58,12 +62,15 @@ and `igmpQueryInterval` for `antrea-agent`. ## Multicast NetworkPolicy -Antrea NetworkPolicy and Antrea ClusterNetworkPolicy are supported for the following -types of multicast traffic: +Antrea NetworkPolicy and Antrea ClusterNetworkPolicy are supported for the +following types of multicast traffic: -1. IGMP egress rules: applied to IGMP membership report and IGMP leave group messages. -2. IGMP ingress rules: applied to IGMP query, which includes IGMPv1, IGMPv2, and IGMPv3. -3. Multicast egress rules: applied to non-IGMP multicast traffic from the selected Pods to other Pods or external hosts. +1. IGMP egress rules: applied to IGMP membership report and IGMP leave group + messages. +2. IGMP ingress rules: applied to IGMP query, which includes IGMPv1, IGMPv2, and + IGMPv3. +3. Multicast egress rules: applied to non-IGMP multicast traffic from the + selected Pods to other Pods or external hosts. Note, multicast ingress rules are not supported at the moment. @@ -73,12 +80,13 @@ examples in the Antrea NetworkPolicy document. ## Debugging and collecting multicast statistics -Antrea provides tooling to check multicast group information and multicast traffic statistics. +Antrea provides tooling to check multicast group information and multicast +traffic statistics. ### Pod multicast group information -The `kubectl get multicastgroups` command prints multicast groups joined by Pods in the cluster. -Example output of the command: +The `kubectl get multicastgroups` command prints multicast groups joined by Pods +in the cluster. Example output of the command: ```bash $ kubectl get multicastgroups @@ -89,17 +97,20 @@ GROUP PODS ### Inbound and outbound multicast traffic statistics -`antctl` supports printing multicast traffic statistics of Pods. Please refer to the corresponding [antctl user guide section](antctl.md#multicast-commands). +`antctl` supports printing multicast traffic statistics of Pods. Please refer to +the corresponding [antctl user guide section](antctl.md#multicast-commands). ### Multicast NetworkPolicy statistics -The [Antrea NetworkPolicyStats feature](feature-gates.md#networkpolicystats) also supports multicast NetworkPolices. +The [Antrea NetworkPolicyStats feature](feature-gates.md#networkpolicystats) +also supports multicast NetworkPolices. ## Use case example -This section will take multicast video streaming as an example to demonstrate how multicast works -with Antrea. In this example, [VLC](https://www.videolan.org/vlc/) multimedia tools are used to -generate and consume multicast video streams. +This section will take multicast video streaming as an example to demonstrate +how multicast works with Antrea. In this example, +[VLC](https://www.videolan.org/vlc/) multimedia tools are used to generate and +consume multicast video streams. To start a video streaming server, we start a VLC Pod to stream a sample video to the multicast IP address `239.255.12.42` with TTL 6. @@ -112,9 +123,9 @@ You can verify multicast traffic is sent out from this Pod by running `antctl get podmulticaststats` in the `antrea-agent` Pod on the local Node, which indicates the VLC Pod is sending out multicast video streams. -You can also check the multicast routes on the Node by running command `ip mroute`, -which should print the following route for forwarding the multicast traffic from -the Antrea gateway interface to the transport interface. +You can also check the multicast routes on the Node by running command +`ip mroute`, which should print the following route for forwarding the multicast +traffic from the Antrea gateway interface to the transport interface. ```bash $ ip mroute @@ -136,13 +147,14 @@ has joined multicast group `239.255.12.42`. ## Limitations -This feature is currently supported only for IPv4 Linux clusters. -Support for Windows and IPv6 will be added in the future. +This feature is currently supported only for IPv4 Linux clusters. Support for +Windows and IPv6 will be added in the future. ### Encap mode -Configuration option `multicastInterfaces` is not supported with encap mode. -Multicast packets in encap mode are SNATed and forwarded to the transport interface only. +The configuration option `multicastInterfaces` is not supported with encap mode. +Multicast packets in encap mode are SNATed and forwarded to the transport +interface only. ### Maximum number of receiver groups on one Node @@ -155,13 +167,21 @@ join more than 20 groups. Multicast IPs in [Local Network Control Block](https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml#multicast-addresses-1) (224.0.0.0/24) can only work in encap mode. Multicast traffic destined for those addresses -is not expected to be forwarded, therefore, no multicast route will be configured for them. -External hosts are not supposed to send and receive traffic with those addresses either. +is not expected to be forwarded, therefore, no multicast route will be +configured for them. External hosts are not supposed to send and receive traffic +with those addresses either. ### Linux kernel -If the following situations apply to your Nodes, you may observe multicast traffic -is not routed correctly: +If the following situations apply to your Nodes, you may observe multicast +traffic is not routed correctly: 1. Node kernel version under 5.4 2. Node network doesn't support IGMP snooping + +### Antrea FlexibleIPAM + +The configuration option `multicastInterfaces` is not supported with +[Antrea FlexibleIPAM](antrea-ipam.md#antrea-flexible-ipam). When Antrea +FlexibleIPAM is enabled, multicast packets are forwarded to the uplink interface +only. diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 7951f90b264..9de45eff092 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -262,8 +262,9 @@ type Controller struct { // nodeGroupID is the OpenFlow group ID in OVS which is used to send IGMP report messages to other Nodes. nodeGroupID binding.GroupIDType // installedNodes is the installed Node set that the IGMP report message is sent to. - installedNodes sets.Set[string] - encapEnabled bool + installedNodes sets.Set[string] + encapEnabled bool + flexibleIPAMEnabled bool } func NewMulticastController(ofClient openflow.Client, @@ -277,13 +278,14 @@ func NewMulticastController(ofClient openflow.Client, igmpQueryVersions []uint8, validator types.McastNetworkPolicyController, isEncap bool, - nodeInformer coreinformers.NodeInformer) *Controller { + nodeInformer coreinformers.NodeInformer, + enableFlexibleIPAM bool) *Controller { eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap) + multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap, enableFlexibleIPAM) c := &Controller{ ofClient: ofClient, ifaceStore: ifaceStore, @@ -300,6 +302,7 @@ func NewMulticastController(ofClient openflow.Client, mcastGroupTimeout: igmpQueryInterval * 3, queryGroupId: v4GroupAllocator.Allocate(), encapEnabled: isEncap, + flexibleIPAMEnabled: enableFlexibleIPAM, } if isEncap { c.nodeGroupID = v4GroupAllocator.Allocate() @@ -336,10 +339,17 @@ func (c *Controller) Initialize() error { if err != nil { return err } + if c.flexibleIPAMEnabled { + if err := c.ofClient.InstallMulticastFlexibleIPAMFlows(); err != nil { + klog.ErrorS(err, "Failed to install OpenFlow flows to handle multicast traffic when flexibleIPAM is enabled") + return err + } + } if c.encapEnabled { // Install OpenFlow group to send the multicast groups that local Pods joined to all other Nodes in the cluster. if err := c.ofClient.InstallMulticastGroup(c.nodeGroupID, nil, nil); err != nil { klog.ErrorS(err, "Failed to update OpenFlow group for remote Nodes") + return err } if err := c.ofClient.InstallMulticastRemoteReportFlows(c.nodeGroupID); err != nil { klog.ErrorS(err, "Failed to install OpenFlow group and flow to send IGMP report to other Nodes") @@ -432,7 +442,11 @@ func (c *Controller) syncGroup(groupKey string) error { } status := obj.(*GroupMemberStatus) memberPorts := make([]uint32, 0, len(status.localMembers)+1) - memberPorts = append(memberPorts, config.HostGatewayOFPort) + if c.flexibleIPAMEnabled { + memberPorts = append(memberPorts, config.UplinkOFPort, c.nodeConfig.HostInterfaceOFPort) + } else { + memberPorts = append(memberPorts, config.HostGatewayOFPort) + } for memberInterfaceName := range status.localMembers { obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName) if !found { diff --git a/pkg/agent/multicast/mcast_controller_test.go b/pkg/agent/multicast/mcast_controller_test.go index 15351780fba..a322a657a58 100644 --- a/pkg/agent/multicast/mcast_controller_test.go +++ b/pkg/agent/multicast/mcast_controller_test.go @@ -91,7 +91,7 @@ func TestAddGroupMemberStatus(t *testing.T) { time: time.Now(), iface: if1, } - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) err := mctrl.initialize(t) mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -114,7 +114,7 @@ func TestAddGroupMemberStatus(t *testing.T) { } func TestUpdateGroupMemberStatus(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) err := mctrl.initialize(t) assert.NoError(t, err) mgroup := net.ParseIP("224.96.1.4") @@ -180,7 +180,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) { } func TestCheckNodeUpdate(t *testing.T) { - mockController := newMockMulticastController(t, false) + mockController := newMockMulticastController(t, false, false) err := mockController.initialize(t) require.NoError(t, err) @@ -256,7 +256,7 @@ func TestCheckNodeUpdate(t *testing.T) { } func TestCheckLastMember(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) workerCount = 1 lastProbe := time.Now() mgroup := net.ParseIP("224.96.1.2") @@ -348,7 +348,7 @@ func TestCheckLastMember(t *testing.T) { func TestGetGroupPods(t *testing.T) { now := time.Now() - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) err := mctrl.initialize(t) require.NoError(t, err) groupMemberStatuses := []*GroupMemberStatus{ @@ -384,7 +384,7 @@ func TestGetGroupPods(t *testing.T) { } func TestGetPodStats(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) err := mctrl.initialize(t) require.NoError(t, err) @@ -401,7 +401,7 @@ func TestGetPodStats(t *testing.T) { } func TestGetAllPodStats(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) err := mctrl.initialize(t) require.NoError(t, err) @@ -445,7 +445,7 @@ func TestGetAllPodStats(t *testing.T) { } func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) workerCount = 1 err := mctrl.initialize(t) require.NoError(t, err) @@ -481,7 +481,7 @@ func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) { } func TestClearStaleGroups(t *testing.T) { - mctrl := newMockMulticastController(t, false) + mctrl := newMockMulticastController(t, false, false) workerCount = 1 err := mctrl.initialize(t) require.NoError(t, err) @@ -557,7 +557,7 @@ func TestClearStaleGroups(t *testing.T) { } func TestProcessPacketIn(t *testing.T) { - mockController := newMockMulticastController(t, false) + mockController := newMockMulticastController(t, false, false) snooper := mockController.igmpSnooper stopCh := make(chan struct{}) defer close(stopCh) @@ -732,14 +732,14 @@ func TestProcessPacketIn(t *testing.T) { } func TestEncapModeInitialize(t *testing.T) { - mockController := newMockMulticastController(t, true) - assert.True(t, mockController.nodeGroupID != 0) + mockController := newMockMulticastController(t, true, false) + assert.NotZero(t, mockController.nodeGroupID) err := mockController.initialize(t) assert.NoError(t, err) } func TestEncapLocalReportAndNotifyRemote(t *testing.T) { - mockController := newMockMulticastController(t, true) + mockController := newMockMulticastController(t, true, false) _ = mockController.initialize(t) mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{ {Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}}, @@ -847,7 +847,7 @@ func TestEncapLocalReportAndNotifyRemote(t *testing.T) { } func TestNodeUpdate(t *testing.T) { - mockController := newMockMulticastController(t, true) + mockController := newMockMulticastController(t, true, false) stopCh := make(chan struct{}) defer close(stopCh) informerFactory.Start(stopCh) @@ -941,7 +941,7 @@ func TestNodeUpdate(t *testing.T) { } func TestMemberChanged(t *testing.T) { - mockController := newMockMulticastController(t, false) + mockController := newMockMulticastController(t, false, false) _ = mockController.initialize(t) containerA := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podA", ContainerID: "tttt"} @@ -998,7 +998,7 @@ func TestMemberChanged(t *testing.T) { } func TestConcurrentEventHandlerAndWorkers(t *testing.T) { - c := newMockMulticastController(t, true) + c := newMockMulticastController(t, true, false) c.ifaceStore = interfacestore.NewInterfaceStore() stopCh := make(chan struct{}) defer close(stopCh) @@ -1084,7 +1084,7 @@ func TestConcurrentEventHandlerAndWorkers(t *testing.T) { } func TestRemoteMemberJoinLeave(t *testing.T) { - mockController := newMockMulticastController(t, true) + mockController := newMockMulticastController(t, true, false) _ = mockController.initialize(t) stopCh := make(chan struct{}) defer close(stopCh) @@ -1240,7 +1240,7 @@ func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEven } } -func newMockMulticastController(t *testing.T, isEncap bool) *Controller { +func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM bool) *Controller { controller := gomock.NewController(t) mockOFClient = openflowtest.NewMockClient(controller) mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller) @@ -1255,10 +1255,16 @@ func newMockMulticastController(t *testing.T, isEncap bool) *Controller { clientset = fake.NewSimpleClientset() informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour) nodeInformer := informerFactory.Core().V1().Nodes() - mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer) + mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM) return mctrl } +func TestFlexibleIPAMModeInitialize(t *testing.T) { + mockController := newMockMulticastController(t, false, true) + err := mockController.initialize(t) + assert.NoError(t, err) +} + func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any()) @@ -1269,6 +1275,9 @@ func (c *Controller) initialize(t *testing.T) error { mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1) mockOFClient.EXPECT().InstallMulticastRemoteReportFlows(c.nodeGroupID).Times(1) } + if c.flexibleIPAMEnabled { + mockOFClient.EXPECT().InstallMulticastFlexibleIPAMFlows().Times(1) + } return c.Initialize() } diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index 43ef5a6aa20..fd5d481549a 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -34,7 +34,7 @@ const ( MulticastRecvBufferSize = 128 ) -func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], encapEnabled bool) *MRouteClient { +func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], encapEnabled bool, flexibleIPAMEnabled bool) *MRouteClient { var m = &MRouteClient{ igmpMsgChan: make(chan []byte, workerCount), nodeConfig: nodeconfig, @@ -42,6 +42,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}), multicastInterfaces: sets.List(multicastInterfaces), socket: multicastSocket, + flexibleIPAMEnabled: flexibleIPAMEnabled, } return m } @@ -79,6 +80,7 @@ type MRouteClient struct { multicastInterfaceConfigs []multicastInterfaceConfig internalInterfaceVIF uint16 externalInterfaceVIFs []uint16 + flexibleIPAMEnabled bool } // multicastInterfacesJoinMgroup allows multicast interfaces to join multicast group, diff --git a/pkg/agent/multicast/mcast_route_linux.go b/pkg/agent/multicast/mcast_route_linux.go index 9495a007647..9695564bee2 100644 --- a/pkg/agent/multicast/mcast_route_linux.go +++ b/pkg/agent/multicast/mcast_route_linux.go @@ -65,6 +65,13 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) { for { buf := make([]byte, MulticastRecvBufferSize) n, _ := syscall.Read(c.socket.GetFD(), buf) + // When Antrea FlexibleIPAM is enabled, messages received by the socket + // will be dropped directly because we won't create any route from the upcall igmpmsg messages. + // In addition, by reading the socket, we can avoid potential errors such as memory bloat. + if c.flexibleIPAMEnabled { + klog.V(4).InfoS("Message was received from the multicast routing socket", "message", buf[:n]) + continue + } if n > 0 { c.igmpMsgChan <- buf[:n] } diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index c55774ba304..2a5ffbc38c6 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -236,7 +236,7 @@ func newMockMulticastRouteClient(t *testing.T) *MRouteClient { groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.New[string](if1.InterfaceName), false) + return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.New[string](if1.InterfaceName), false, false) } func (c *MRouteClient) initialize(t *testing.T) error { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index bbd558e77d1..209e1049c4e 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -322,7 +322,10 @@ type Client interface { // UninstallMulticastFlows removes the flow matching the given multicastIP. UninstallMulticastFlows(multicastIP net.IP) error - + // InstallMulticastFlexibleIPAMFlows installs two flows and forwards them to the first table of Multicast Pipeline + // when flexibleIPAM is enabled, with one flow matching inbound multicast traffic from the uplink and the other from + // the host interface, making multicast packets coming from the host and other Nodes be forward to the OVS multicast pipeline. + InstallMulticastFlexibleIPAMFlows() error // InstallMulticastRemoteReportFlows installs flows to forward the IGMP report messages to the other Nodes, // and packetIn the report messages to Antrea Agent which is received via tunnel port. // The OpenFlow group identified by groupID is used to forward packet to all other Nodes in the cluster @@ -968,8 +971,13 @@ func (c *client) generatePipelines() { } if c.enableMulticast { + uplinkPort := uint32(0) + if c.nodeConfig.UplinkNetConfig != nil { + uplinkPort = c.nodeConfig.UplinkNetConfig.OFPort + } + // TODO: add support for IPv6 protocol - c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort) + c.featureMulticast = newFeatureMulticast(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP}, c.bridge, c.enableAntreaPolicy, c.nodeConfig.GatewayConfig.OFPort, c.networkConfig.TrafficEncapMode.SupportsEncap(), config.DefaultTunOFPort, uplinkPort, c.nodeConfig.HostInterfaceOFPort, c.connectUplinkToBridge) c.activatedFeatures = append(c.activatedFeatures, c.featureMulticast) } @@ -1438,6 +1446,15 @@ func (c *client) UninstallMulticastFlows(multicastIP net.IP) error { return c.deleteFlows(c.featureMulticast.cachedFlows, cacheKey) } +func (c *client) InstallMulticastFlexibleIPAMFlows() error { + firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() + flows := c.featureMulticast.multicastForwardFlexibleIPAMFlows(firstMulticastTable) + cacheKey := "multicast_flexible_ipam" + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featureMulticast.cachedFlows, cacheKey, flows) +} + func (c *client) InstallMulticastRemoteReportFlows(groupID binding.GroupIDType) error { firstMulticastTable := c.pipelines[pipelineMulticast].GetFirstTable() flows := c.featureMulticast.multicastRemoteReportFlows(groupID, firstMulticastTable) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 1f7f8ab0a54..11107f32b56 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -2301,6 +2301,28 @@ func Test_client_InstallMulticastRemoteReportFlows(t *testing.T) { assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI)) } +func Test_client_InstallMulticasFlexibleIPAMFlows(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + m := oftest.NewMockOFEntryOperations(ctrl) + fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeNoEncap, enableMulticast, enableConnectUplinkToBridge, disableEgress) + defer resetPipelines() + + expectedFlows := []string{ + "cookie=0x1050000000000, table=Classifier, priority=210,ip,in_port=4,nw_dst=224.0.0.0/4 actions=goto_table:MulticastEgressRule", + "cookie=0x1050000000000, table=Classifier, priority=210,ip,in_port=4294967294,nw_dst=224.0.0.0/4 actions=goto_table:MulticastEgressRule", + } + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + + cacheKey := "multicast_flexible_ipam" + + assert.NoError(t, fc.InstallMulticastFlexibleIPAMFlows()) + fCacheI, ok := fc.featureMulticast.cachedFlows.Load(cacheKey) + require.True(t, ok) + assert.ElementsMatch(t, expectedFlows, getFlowStrings(fCacheI)) +} + func Test_client_SendIGMPQueryPacketOut(t *testing.T) { ctrl := gomock.NewController(t) mockBridge := ovsoftest.NewMockBridge(ctrl) diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 0cf1384d51c..87befdffe47 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -27,12 +27,15 @@ import ( ) type featureMulticast struct { - cookieAllocator cookie.Allocator - ipProtocols []binding.Protocol - bridge binding.Bridge - gatewayPort uint32 - encapEnabled bool - tunnelPort uint32 + cookieAllocator cookie.Allocator + ipProtocols []binding.Protocol + bridge binding.Bridge + gatewayPort uint32 + encapEnabled bool + flexibleIPAMEnabled bool + tunnelPort uint32 + uplinkPort uint32 + hostOFPort uint32 cachedFlows *flowCategoryCache groupCache sync.Map @@ -45,18 +48,21 @@ func (f *featureMulticast) getFeatureName() string { return "Multicast" } -func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32) *featureMulticast { +func newFeatureMulticast(cookieAllocator cookie.Allocator, ipProtocols []binding.Protocol, bridge binding.Bridge, anpEnabled bool, gwPort uint32, encapEnabled bool, tunnelPort uint32, uplinkPort uint32, hostOFPort uint32, flexibleIPAMEnabled bool) *featureMulticast { return &featureMulticast{ - cookieAllocator: cookieAllocator, - ipProtocols: ipProtocols, - cachedFlows: newFlowCategoryCache(), - bridge: bridge, - category: cookie.Multicast, - groupCache: sync.Map{}, - enableAntreaPolicy: anpEnabled, - gatewayPort: gwPort, - encapEnabled: encapEnabled, - tunnelPort: tunnelPort, + cookieAllocator: cookieAllocator, + ipProtocols: ipProtocols, + cachedFlows: newFlowCategoryCache(), + bridge: bridge, + category: cookie.Multicast, + groupCache: sync.Map{}, + enableAntreaPolicy: anpEnabled, + gatewayPort: gwPort, + encapEnabled: encapEnabled, + tunnelPort: tunnelPort, + uplinkPort: uplinkPort, + hostOFPort: hostOFPort, + flexibleIPAMEnabled: flexibleIPAMEnabled, } } @@ -200,6 +206,21 @@ func (f *featureMulticast) replayMeters() []binding.OFEntry { return nil } +func (f *featureMulticast) multicastForwardFlexibleIPAMFlows(table binding.Table) []binding.Flow { + ports := []uint32{f.uplinkPort, f.hostOFPort} + flows := make([]binding.Flow, 0, len(ports)) + for _, port := range ports { + flows = append(flows, ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(port). + MatchProtocol(binding.ProtocolIP). + MatchDstIPNet(*types.McastCIDR). + Action().GotoTable(table.GetID()). + Done()) + } + return flows +} + func (f *featureMulticast) multicastRemoteReportFlows(groupID binding.GroupIDType, firstMulticastTable binding.Table) []binding.Flow { return []binding.Flow{ // This flow outputs the IGMP report message sent from Antrea Agent to an OpenFlow group which is expected to diff --git a/pkg/agent/openflow/multicast_test.go b/pkg/agent/openflow/multicast_test.go index d03f2d36447..af3cb1244d0 100644 --- a/pkg/agent/openflow/multicast_test.go +++ b/pkg/agent/openflow/multicast_test.go @@ -29,7 +29,7 @@ func multicastInitFlows(isEncap bool) []string { "cookie=0x1050000000000, table=MulticastEgressPodMetric, priority=210,igmp actions=goto_table:MulticastRouting", "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=controller(id=32776,reason=no_match,userdata=03,max_len=65535)", "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x1/0xf actions=controller(id=32776,reason=no_match,userdata=03,max_len=65535)", - "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=set_field:0x200000/0x600000->reg0,set_field:0x2->reg1,goto_table:MulticastOutput", + "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=output:2", "cookie=0x1050000000000, table=MulticastIngressPodMetric, priority=210,igmp actions=goto_table:MulticastOutput", "cookie=0x1050000000000, table=MulticastOutput, priority=210,reg0=0x200001/0x60000f,reg1=0x2 actions=drop", "cookie=0x1050000000000, table=MulticastOutput, priority=210,reg0=0x200002/0x60000f,reg1=0x1 actions=drop", @@ -39,7 +39,7 @@ func multicastInitFlows(isEncap bool) []string { return []string{ "cookie=0x1050000000000, table=MulticastIngressPodMetric, priority=210,igmp actions=goto_table:MulticastOutput", "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=controller(id=32776,reason=no_match,userdata=03,max_len=65535)", - "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=set_field:0x200000/0x600000->reg0,set_field:0x2->reg1,goto_table:MulticastOutput", + "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=output:2", "cookie=0x1050000000000, table=MulticastEgressPodMetric, priority=210,igmp actions=goto_table:MulticastRouting", "cookie=0x1050000000000, table=MulticastEgressRule, priority=64990,igmp,reg0=0x3/0xf actions=goto_table:MulticastRouting", "cookie=0x1050000000000, table=MulticastOutput, priority=200,reg0=0x200000/0x600000 actions=output:NXM_NX_REG1[]", diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index ed1d6e9b3ab..fd38572aad0 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2905,20 +2905,24 @@ func (f *featureMulticast) localMulticastForwardFlows(multicastIP net.IP, groupI } } -// externalMulticastReceiverFlow generates the flow to output multicast packets to Antrea gateway, so that local Pods can -// send multicast packets to access the external receivers. For the case that one or more local Pods have joined the target -// multicast group, it is handled by the flows created by function "localMulticastForwardFlows" after local Pods report the -// IGMP membership. +// externalMulticastReceiverFlow generates the flow to output multicast packets to Antrea gateway interface (to the host interface +// and the uplink interface when flexibleIPAM is enabled), so that local Pods can send multicast packets to the external receivers. +// For the case that one or more local Pods have joined the target multicast group, it is handled by the flows created by +// function "localMulticastForwardFlows" after local Pods report the IGMP membership. // Because there are ingress tables between MulticastRoutingTable and MulticastOutputTable, while currently ingress rules only // support IGMP query, it is not necessary to goto the ingress tables for other multicast traffic. func (f *featureMulticast) externalMulticastReceiverFlow() binding.Flow { - return MulticastRoutingTable.ofTable.BuildFlow(priorityLow). + outputPorts := []uint32{f.gatewayPort} + if f.flexibleIPAMEnabled { + outputPorts = []uint32{f.hostOFPort, f.uplinkPort} + } + flow := MulticastRoutingTable.ofTable.BuildFlow(priorityLow). Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(binding.ProtocolIP). - Action().LoadRegMark(OutputToOFPortRegMark). - Action().LoadToRegField(TargetOFPortField, f.gatewayPort). - Action().GotoStage(stageOutput). - Done() + MatchProtocol(binding.ProtocolIP) + for _, outputPort := range outputPorts { + flow = flow.Action().Output(outputPort) + } + return flow.Done() } // NewClient is the constructor of the Client interface. diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 8137fda4ef3..e778cae3203 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -293,6 +293,20 @@ func (mr *MockClientMockRecorder) InstallEndpointFlows(arg0, arg1 any) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallEndpointFlows", reflect.TypeOf((*MockClient)(nil).InstallEndpointFlows), arg0, arg1) } +// InstallMulticastFlexibleIPAMFlows mocks base method. +func (m *MockClient) InstallMulticastFlexibleIPAMFlows() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticastFlexibleIPAMFlows") + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticastFlexibleIPAMFlows indicates an expected call of InstallMulticastFlexibleIPAMFlows. +func (mr *MockClientMockRecorder) InstallMulticastFlexibleIPAMFlows() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticastFlexibleIPAMFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticastFlexibleIPAMFlows)) +} + // InstallMulticastFlows mocks base method. func (m *MockClient) InstallMulticastFlows(arg0 net.IP, arg1 openflow.GroupIDType) error { m.ctrl.T.Helper() diff --git a/test/e2e/antreaipam_anp_test.go b/test/e2e/antreaipam_anp_test.go index 9f24825e095..e5c0ef8ce02 100644 --- a/test/e2e/antreaipam_anp_test.go +++ b/test/e2e/antreaipam_anp_test.go @@ -155,6 +155,7 @@ func TestAntreaIPAMAntreaPolicy(t *testing.T) { t.Run("Case=ACNPAntreaIPAMNodePortServiceSupport", func(t *testing.T) { testACNPNodePortServiceSupport(t, data, testAntreaIPAMNamespace) }) t.Run("Case=ACNPAntreaIPAMVLAN11NodePortServiceSupport", func(t *testing.T) { testACNPNodePortServiceSupport(t, data, testAntreaIPAMNamespace11) }) + t.Run("Case=ACNPAntreaIPAMMulticast", func(t *testing.T) { testMulticastNP(t, data, testAntreaIPAMNamespace) }) }) // print results for reachability tests printResults() diff --git a/test/e2e/antreaipam_test.go b/test/e2e/antreaipam_test.go index 48f30bbab1a..f1cec366f73 100644 --- a/test/e2e/antreaipam_test.go +++ b/test/e2e/antreaipam_test.go @@ -251,6 +251,12 @@ func TestAntreaIPAM(t *testing.T) { testAntreaIPAMStatefulSet(t, data, nil) checkIPPoolsEmpty(t, data, ipPools) }) + + t.Run("testMulticastWithFlexibleIPAM", func(t *testing.T) { + skipIfHasWindowsNodes(t) + skipIfNotIPv4Cluster(t) + runMulticastTestCases(t, data, testAntreaIPAMNamespace) + }) } func testAntreaIPAMPodConnectivitySameNode(t *testing.T, data *TestData) { diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index 9b6d5409d05..27c5ae0cb28 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -3877,17 +3877,16 @@ func testACNPNodePortServiceSupport(t *testing.T, data *TestData, serverNamespac failOnError(k8sUtils.DeleteACNP(builder.Name), t) } -func testACNPIGMPQueryAllow(t *testing.T, data *TestData) { - testACNPIGMPQuery(t, data, "test-acnp-igmp-query-allow", "testMulticastIGMPQueryAllow", "224.3.4.13", crdv1beta1.RuleActionAllow) +func testACNPIGMPQueryAllow(t *testing.T, data *TestData, testNamespace string) { + testACNPIGMPQuery(t, data, "test-acnp-igmp-query-allow", "testMulticastIGMPQueryAllow", "224.3.4.13", crdv1beta1.RuleActionAllow, testNamespace) } -func testACNPIGMPQueryDrop(t *testing.T, data *TestData) { - testACNPIGMPQuery(t, data, "test-acnp-igmp-query-drop", "testMulticastIGMPQueryDrop", "224.3.4.14", crdv1beta1.RuleActionDrop) +func testACNPIGMPQueryDrop(t *testing.T, data *TestData, testNamespace string) { + testACNPIGMPQuery(t, data, "test-acnp-igmp-query-drop", "testMulticastIGMPQueryDrop", "224.3.4.14", crdv1beta1.RuleActionDrop, testNamespace) } -func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAddress string, action crdv1beta1.RuleAction) { +func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAddress string, action crdv1beta1.RuleAction, testNamespace string) { mcjoinWaitTimeout := defaultTimeout / time.Second - testNamespace := data.testNamespace mc := multicastTestcase{ name: caseName, senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, @@ -3898,7 +3897,7 @@ func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAd senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), testNamespace, mc.senderConfig.isHostNetwork) defer cleanupFunc() var wg sync.WaitGroup - receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) + receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, testNamespace, &wg) for _, cleanupFunc := range cleanupFuncs { defer cleanupFunc() } @@ -3909,11 +3908,11 @@ func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAd data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() - tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), testNamespace, true) + tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), data.testNamespace, true) defer cleanupFunc() queryGroupAddress := "224.0.0.1" - cmd, err := generatePacketCaptureCmd(t, data, 15, queryGroupAddress, nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0]) + cmd, err := generatePacketCaptureCmd(t, data, 15, queryGroupAddress, nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0], testNamespace) if err != nil { t.Fatalf("failed to call generateConnCheckCmd: %v", err) } @@ -3963,17 +3962,16 @@ func testACNPIGMPQuery(t *testing.T, data *TestData, acnpName, caseName, groupAd } } -func testACNPMulticastEgressAllow(t *testing.T, data *TestData) { - testACNPMulticastEgress(t, data, "test-acnp-multicast-egress-allow", "testMulticastEgressAllowTraffic", "224.3.4.15", crdv1beta1.RuleActionAllow) +func testACNPMulticastEgressAllow(t *testing.T, data *TestData, testNamespace string) { + testACNPMulticastEgress(t, data, "test-acnp-multicast-egress-allow", "testMulticastEgressAllowTraffic", "224.3.4.15", crdv1beta1.RuleActionAllow, testNamespace) } -func testACNPMulticastEgressDrop(t *testing.T, data *TestData) { - testACNPMulticastEgress(t, data, "test-acnp-multicast-egress-drop", "testMulticastEgressDropTrafficFor", "224.3.4.16", crdv1beta1.RuleActionDrop) +func testACNPMulticastEgressDrop(t *testing.T, data *TestData, testNamespace string) { + testACNPMulticastEgress(t, data, "test-acnp-multicast-egress-drop", "testMulticastEgressDropTrafficFor", "224.3.4.16", crdv1beta1.RuleActionDrop, testNamespace) } -func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, groupAddress string, action crdv1beta1.RuleAction) { +func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, groupAddress string, action crdv1beta1.RuleAction, testNamespace string) { mcjoinWaitTimeout := defaultTimeout / time.Second - testNamespace := data.testNamespace mc := multicastTestcase{ name: caseName, senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, @@ -3984,7 +3982,7 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), testNamespace, mc.senderConfig.isHostNetwork) defer cleanupFunc() var wg sync.WaitGroup - receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) + receiverNames, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, testNamespace, &wg) for _, cleanupFunc := range cleanupFuncs { defer cleanupFunc() } @@ -3996,9 +3994,9 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() // check if receiver can receive multicast packet - tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), testNamespace, true) + tcpdumpName, _, cleanupFunc := createAndWaitForPod(t, data, data.createToolboxPodOnNode, "test-tcpdump-", nodeName(mc.receiverConfigs[0].nodeIdx), data.testNamespace, true) defer cleanupFunc() - cmd, err := generatePacketCaptureCmd(t, data, 5, mc.group.String(), nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0]) + cmd, err := generatePacketCaptureCmd(t, data, 5, mc.group.String(), nodeName(mc.receiverConfigs[0].nodeIdx), receiverNames[0], testNamespace) if err != nil { t.Fatalf("failed to call generateConnCheckCmd: %v", err) } @@ -4084,9 +4082,9 @@ func checkAuditLoggingResult(t *testing.T, data *TestData, nodeName, logLocator } } -func generatePacketCaptureCmd(t *testing.T, data *TestData, timeout int, hostIP, nodeName, podName string) (string, error) { +func generatePacketCaptureCmd(t *testing.T, data *TestData, timeout int, hostIP, nodeName, podName string, testNamespace string) (string, error) { agentPodName := getAntreaPodName(t, data, nodeName) - cmds := []string{"antctl", "get", "podinterface", podName, "-n", data.testNamespace, "-o", "json"} + cmds := []string{"antctl", "get", "podinterface", podName, "-n", testNamespace, "-o", "json"} stdout, stderr, err := runAntctl(agentPodName, cmds, data) var podInterfaceInfo []podinterface.Response if err := json.Unmarshal([]byte(stdout), &podInterfaceInfo); err != nil { @@ -4448,14 +4446,18 @@ func TestAntreaPolicy(t *testing.T) { t.Run("TestMulticastNP", func(t *testing.T) { skipIfMulticastDisabled(t, data) - t.Run("Case=MulticastNPIGMPQueryAllow", func(t *testing.T) { testACNPIGMPQueryAllow(t, data) }) - t.Run("Case=MulticastNPIGMPQueryDrop", func(t *testing.T) { testACNPIGMPQueryDrop(t, data) }) - t.Run("Case=MulticastNPPolicyEgressAllow", func(t *testing.T) { testACNPMulticastEgressAllow(t, data) }) - t.Run("Case=MulticastNPPolicyEgressDrop", func(t *testing.T) { testACNPMulticastEgressDrop(t, data) }) + testMulticastNP(t, data, data.testNamespace) }) k8sUtils.Cleanup(namespaces) } +func testMulticastNP(t *testing.T, data *TestData, testNamespace string) { + t.Run("Case=MulticastNPIGMPQueryAllow", func(t *testing.T) { testACNPIGMPQueryAllow(t, data, testNamespace) }) + t.Run("Case=MulticastNPIGMPQueryDrop", func(t *testing.T) { testACNPIGMPQueryDrop(t, data, testNamespace) }) + t.Run("Case=MulticastNPPolicyEgressAllow", func(t *testing.T) { testACNPMulticastEgressAllow(t, data, testNamespace) }) + t.Run("Case=MulticastNPPolicyEgressDrop", func(t *testing.T) { testACNPMulticastEgressDrop(t, data, testNamespace) }) +} + func TestAntreaPolicyStatus(t *testing.T) { skipIfHasWindowsNodes(t) skipIfAntreaPolicyDisabled(t) diff --git a/test/e2e/multicast_test.go b/test/e2e/multicast_test.go index d336014f5c1..72f6bcf8666 100644 --- a/test/e2e/multicast_test.go +++ b/test/e2e/multicast_test.go @@ -58,7 +58,16 @@ func TestMulticast(t *testing.T) { defer teardownTest(t, data) skipIfMulticastDisabled(t, data) - nodeMulticastInterfaces, err := computeMulticastInterfaces(t, data) + runMulticastTestCases(t, data, data.testNamespace) +} + +func runMulticastTestCases(t *testing.T, data *TestData, testNamespace string) { + var err error + transportInterface, err := data.GetTransportInterface() + if err != nil { + t.Fatalf("Error getting transport interfaces: %v", err) + } + nodeMulticastInterfaces, err := computeMulticastInterfaces(t, data, transportInterface) if err != nil { t.Fatalf("Error computing multicast interfaces: %v", err) } @@ -67,14 +76,18 @@ func TestMulticast(t *testing.T) { t.Fatalf("Failed to get encap mode: %v", err) } // Only check receiver router in noEncap mode. - checkReceiverRoute := encapMode == config.TrafficEncapModeNoEncap - runMulticastTestCases(t, data, nodeMulticastInterfaces, checkReceiverRoute) -} - -func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { + checkReceiverRoute := encapMode == config.TrafficEncapModeNoEncap && encapMode != config.TrafficEncapModeNoEncap + checkSenderRoute := !testOptions.enableAntreaIPAM t.Run("testMulticastBetweenPodsInTwoNodes", func(t *testing.T) { skipIfNumNodesLessThan(t, 2) testcases := []multicastTestcase{ + { + name: "testMulticastForLocalPodsWithHostNetworkSender", + senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: true}, + receiverConfigs: []multicastTestPodConfig{{0, false}}, + port: 3455, + group: net.ParseIP("224.3.4.4"), + }, { name: "testMulticastForLocalPods", senderConfig: multicastTestPodConfig{nodeIdx: 0, isHostNetwork: false}, @@ -108,7 +121,7 @@ func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, testNamespace, transportInterface, checkReceiverRoute, checkSenderRoute) }) } }) @@ -148,7 +161,7 @@ func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces mc := mc t.Run(mc.name, func(t *testing.T) { t.Parallel() - runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, checkReceiverRoute) + runTestMulticastBetweenPods(t, data, mc, nodeMulticastInterfaces, testNamespace, transportInterface, checkReceiverRoute, checkSenderRoute) }) } }) @@ -156,6 +169,7 @@ func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces // Skip this case with encap mode because iptables masquerade is configured, and it leads the multicast packet // sent from Pod are not able to forwarded to more than one network interface on the host. skipIfEncapModeIsNot(t, data, config.TrafficEncapModeNoEncap) + skipIfAntreaIPAMTest(t) multipleInterfacesFound := false var nodeIdx int for i, ifaces := range nodeMulticastInterfaces { @@ -277,7 +291,7 @@ func runMulticastTestCases(t *testing.T, data *TestData, nodeMulticastInterfaces for _, mc := range testcases { mc := mc t.Run(mc.name, func(t *testing.T) { - testMulticastStatsWithSendersReceivers(t, data, mc) + testMulticastStatsWithSendersReceivers(t, data, testNamespace, mc) }) } }) @@ -336,16 +350,16 @@ type ruleConfig struct { // testMulticastStatsWithSendersReceivers tests multiple multicast senders and receivers cases with specified AntreaNetworkPolicies which may drop/allow IGMP or Multicast traffic. // It checks the results of all the multicaststats-related commands, including kubectl get multicastgroups, antctl get podmulticaststats and kubectl get antreanetworkpolicystats. -func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc multicastStatsTestcase) { +func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, testNamespace string, mc multicastStatsTestcase) { mcjoinWaitTimeout := defaultTimeout / time.Second for _, senderConfig := range mc.senderConfigs { - _, _, cleanupFunc := createAndWaitForPodWithExactName(t, data, data.createMcJoinPodOnNode, senderConfig.name, senderConfig.nodeName, data.testNamespace, false) + _, _, cleanupFunc := createAndWaitForPodWithExactName(t, data, data.createMcJoinPodOnNode, senderConfig.name, senderConfig.nodeName, testNamespace, false) defer cleanupFunc() } for _, receiverConfig := range mc.receiverConfigs { - _, _, cleanupFunc := createAndWaitForPodWithExactName(t, data, data.createMcJoinPodOnNode, receiverConfig.name, receiverConfig.nodeName, data.testNamespace, false) + _, _, cleanupFunc := createAndWaitForPodWithExactName(t, data, data.createMcJoinPodOnNode, receiverConfig.name, receiverConfig.nodeName, testNamespace, false) defer cleanupFunc() } @@ -355,7 +369,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul for _, anp := range mc.multicastANPConfigs { np := &crdv1beta1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{Namespace: data.testNamespace, Name: anp.name, Labels: map[string]string{"antrea-e2e": anp.name}}, + ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: anp.name, Labels: map[string]string{"antrea-e2e": anp.name}}, Spec: crdv1beta1.NetworkPolicySpec{ Priority: p10, AppliedTo: []crdv1beta1.AppliedTo{ @@ -380,16 +394,16 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if _, err = k8sUtils.CreateOrUpdateANNP(np); err != nil { t.Fatalf("Creating ANP %s failed: %v", np.Name, err) } - err = data.waitForANNPRealized(t, data.testNamespace, np.Name, policyRealizedTimeout) + err = data.waitForANNPRealized(t, testNamespace, np.Name, policyRealizedTimeout) if err != nil { t.Fatalf("Error when waiting for ANP %s to be realized: %v", np.Name, err) } - defer data.DeleteANNP(data.testNamespace, anp.name) + defer data.DeleteANNP(testNamespace, anp.name) } for _, anp := range mc.igmpANPConfigs { np := &crdv1beta1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{Namespace: data.testNamespace, Name: anp.name, Labels: map[string]string{"antrea-e2e": anp.name}}, + ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: anp.name, Labels: map[string]string{"antrea-e2e": anp.name}}, Spec: crdv1beta1.NetworkPolicySpec{ Priority: p10, AppliedTo: []crdv1beta1.AppliedTo{ @@ -419,18 +433,18 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if _, err = k8sUtils.CreateOrUpdateANNP(np); err != nil { t.Fatalf("Creating ANP %s failed: %v", np.Name, err) } - err = data.waitForANNPRealized(t, data.testNamespace, np.Name, policyRealizedTimeout) + err = data.waitForANNPRealized(t, testNamespace, np.Name, policyRealizedTimeout) if err != nil { t.Fatalf("Error when waiting for ANP %s released: %v", np.Name, err) } - defer data.DeleteANNP(data.testNamespace, anp.name) + defer data.DeleteANNP(testNamespace, anp.name) } for _, receiverConfig := range mc.receiverConfigs { for _, addr := range receiverConfig.IPs { go func(receiver, addr string) { cmd := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -o -p %d -W %d %s", 2234, mcjoinWaitTimeout, addr)} - data.RunCommandFromPod(data.testNamespace, receiver, mcjoinContainerName, cmd) + data.RunCommandFromPod(testNamespace, receiver, mcjoinContainerName, cmd) }(receiverConfig.name, addr) } } @@ -445,7 +459,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul go func(sender, addr string, sessions int) { defer wg.Done() cmd := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -f 500 -o -p %d -s -t 30 -c %d -W %d %s", 2234, sessions, mcjoinWaitTimeout, addr)} - data.RunCommandFromPod(data.testNamespace, sender, mcjoinContainerName, cmd) + data.RunCommandFromPod(testNamespace, sender, mcjoinContainerName, cmd) }(senderConfig.name, addr, senderConfig.sendSessions) } } @@ -459,7 +473,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error getting getAntreaPodOnNode for %s", senderConfig.name) } - matches, err := checkAntctlResult(t, data, antreaPod, senderConfig.name, stats.Inbound, stats.Outbound) + matches, err := checkAntctlResult(t, data, antreaPod, senderConfig.name, testNamespace, stats.Inbound, stats.Outbound) if err != nil || !matches { return false, err } @@ -473,13 +487,13 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul if err != nil { t.Fatalf("Error getting getAntreaPodOnNode for %s", receiverConfig.name) } - matches, err := checkAntctlResult(t, data, antreaPod, receiverConfig.name, stats.Inbound, stats.Outbound) + matches, err := checkAntctlResult(t, data, antreaPod, receiverConfig.name, testNamespace, stats.Inbound, stats.Outbound) if err != nil || !matches { return false, err } } for _, anp := range mc.igmpANPConfigs { - stats, err := data.crdClient.StatsV1alpha1().AntreaNetworkPolicyStats(data.testNamespace).Get(context.TODO(), anp.name, metav1.GetOptions{}) + stats, err := data.crdClient.StatsV1alpha1().AntreaNetworkPolicyStats(testNamespace).Get(context.TODO(), anp.name, metav1.GetOptions{}) if err != nil { return false, err } @@ -500,7 +514,7 @@ func testMulticastStatsWithSendersReceivers(t *testing.T, data *TestData, mc mul } } for _, anp := range mc.multicastANPConfigs { - stats, err := data.crdClient.StatsV1alpha1().AntreaNetworkPolicyStats(data.testNamespace).Get(context.TODO(), anp.name, metav1.GetOptions{}) + stats, err := data.crdClient.StatsV1alpha1().AntreaNetworkPolicyStats(testNamespace).Get(context.TODO(), anp.name, metav1.GetOptions{}) if err != nil { return false, err } @@ -577,7 +591,7 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send } } -func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, checkReceiverRoute bool) { +func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, testNamespace string, transportInterface string, checkReceiverRoute bool, checkSenderRoute bool) { currentEncapMode, _ := data.GetEncapMode() if requiresExternalHostSupport(mc) && currentEncapMode == config.TrafficEncapModeEncap { t.Skipf("Multicast does not support using hostNetwork Pod to simulate the external host with encap mode, skip the case") @@ -585,10 +599,10 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc mcjoinWaitTimeout := defaultTimeout / time.Second gatewayInterface, err := data.GetGatewayInterfaceName(antreaNamespace) failOnError(err, t) - senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), data.testNamespace, mc.senderConfig.isHostNetwork) + senderName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-sender-", nodeName(mc.senderConfig.nodeIdx), testNamespace, mc.senderConfig.isHostNetwork) defer cleanupFunc() var wg sync.WaitGroup - _, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, &wg) + _, cleanupFuncs := setupReceivers(t, data, mc, mcjoinWaitTimeout, testNamespace, &wg) for _, cleanupFunc := range cleanupFuncs { defer cleanupFunc() } @@ -597,7 +611,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc // It sends two multicast packets for every second(-f 500 means it takes 500 milliseconds for sending one packet). sendMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -f 500 -o -p %d -s -t 3 -w 2 -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} go func() { - data.RunCommandFromPod(data.testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) + data.RunCommandFromPod(testNamespace, senderName, mcjoinContainerName, sendMulticastCommand) }() runCmdWithOutputFilters := func(nodeName string, cmd []string, outputFilters ...string) ([]string, error) { @@ -631,7 +645,7 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc readyReceivers := sets.New[int]() senderReady := false if err := wait.Poll(3*time.Second, defaultTimeout, func() (bool, error) { - if !senderReady { + if checkSenderRoute && !senderReady { // Sender pods should add an outbound multicast route except when running as HostNetwork. mRoutesResult, err := getMroutes(nodeName(mc.senderConfig.nodeIdx), gatewayInterface, mc.group.String(), strings.Join(nodeMulticastInterfaces[mc.senderConfig.nodeIdx], " ")) if err != nil { @@ -654,24 +668,24 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc if readyReceivers.Has(receiver.nodeIdx) { continue } - for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { - if checkReceiverRoute { - mRoutesResult, err := getMroutes(nodeName(receiver.nodeIdx), receiverMulticastInterface, mc.group.String()) - if err != nil { - return false, err + if checkReceiverRoute { + mRoutesResult, err := getMroutes(nodeName(receiver.nodeIdx), transportInterface, mc.group.String()) + if err != nil { + return false, err + } + // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, + // the receivers should configure corresponding inbound multicast routes. + if (mc.senderConfig.nodeIdx != receiver.nodeIdx || mc.senderConfig.isHostNetwork) && !receiver.isHostNetwork { + if len(mRoutesResult) == 0 { + return false, nil } - // If multicast traffic is sent from non-HostNetwork pods and senders-receivers are located in different nodes, - // the receivers should configure corresponding inbound multicast routes. - if mc.senderConfig.nodeIdx != receiver.nodeIdx && !receiver.isHostNetwork { - if len(mRoutesResult) == 0 { - return false, nil - } - } else { - if len(mRoutesResult) != 0 { - return false, nil - } + } else { + if len(mRoutesResult) != 0 { + return false, nil } } + } + for _, receiverMulticastInterface := range nodeMulticastInterfaces[receiver.nodeIdx] { mAddrsResult, err := getMaddrs(nodeName(receiver.nodeIdx), receiverMulticastInterface, mc.group.String()) if err != nil { return false, err @@ -698,11 +712,11 @@ func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestc wg.Wait() } -func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWaitTimeout time.Duration, wg *sync.WaitGroup) ([]string, []func()) { +func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWaitTimeout time.Duration, testNamespace string, wg *sync.WaitGroup) ([]string, []func()) { receiverNames := make([]string, 0) cleanupFuncs := []func(){} for _, receiver := range mc.receiverConfigs { - receiverName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-receiver-", nodeName(receiver.nodeIdx), data.testNamespace, receiver.isHostNetwork) + receiverName, _, cleanupFunc := createAndWaitForPod(t, data, data.createMcJoinPodOnNode, "test-receiver-", nodeName(receiver.nodeIdx), testNamespace, receiver.isHostNetwork) receiverNames = append(receiverNames, receiverName) cleanupFuncs = append(cleanupFuncs, cleanupFunc) } @@ -715,7 +729,7 @@ func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWa // The following command joins a multicast group and sets the timeout to 100 seconds(-W 100) before exit. // The command will return after receiving 10 packet(-c 10). receiveMulticastCommand := []string{"/bin/sh", "-c", fmt.Sprintf("mcjoin -c 10 -o -p %d -W %d %s", mc.port, mcjoinWaitTimeout, mc.group.String())} - res, _, err := data.RunCommandFromPod(data.testNamespace, r, mcjoinContainerName, receiveMulticastCommand) + res, _, err := data.RunCommandFromPod(testNamespace, r, mcjoinContainerName, receiveMulticastCommand) failOnError(err, t) assert.Contains(t, res, "Total: 10 packets") }() @@ -725,15 +739,12 @@ func setupReceivers(t *testing.T, data *TestData, mc multicastTestcase, mcjoinWa // computeMulticastInterfaces computes multicastInterfaces for each node. // It returns [][]string with its index as node index and value as multicastInterfaces for this node. -func computeMulticastInterfaces(t *testing.T, data *TestData) (map[int][]string, error) { +func computeMulticastInterfaces(t *testing.T, data *TestData, transportInterface string) (map[int][]string, error) { multicastInterfaces, err := data.GetMulticastInterfaces(antreaNamespace) if err != nil { return nil, err } - transportInterface, err := data.GetTransportInterface() - if err != nil { - t.Fatalf("Error getting transport interfaces: %v", err) - } + nodeMulticastInterfaces := make(map[int][]string) for nodeIdx := range clusterInfo.nodes { _, localInterfacesStr, _, err := data.RunCommandOnNode(nodeName(nodeIdx), "ls /sys/class/net") @@ -751,7 +762,7 @@ func computeMulticastInterfaces(t *testing.T, data *TestData) (map[int][]string, return nodeMulticastInterfaces, nil } -func checkAntctlResult(t *testing.T, data *TestData, antreaPodName, containerPodName string, inbound, outbound uint64) (bool, error) { +func checkAntctlResult(t *testing.T, data *TestData, antreaPodName, containerPodName string, testNamespace string, inbound, outbound uint64) (bool, error) { antctlCmds := []string{"antctl", "get", "podmulticaststats"} stdout, stderr, err := runAntctl(antreaPodName, antctlCmds, data) if err != nil { @@ -759,7 +770,7 @@ func checkAntctlResult(t *testing.T, data *TestData, antreaPodName, containerPod return false, err } t.Logf("The result of running antctl get podmulticaststats in %s is stdout: %s, stderr: %s, err: %v", antreaPodName, stdout, stderr, err) - match, _ := regexp.MatchString(fmt.Sprintf("%s[[:space:]]+%s[[:space:]]+%d[[:space:]]+%d", data.testNamespace, containerPodName, inbound, outbound), strings.TrimSpace(stdout)) + match, _ := regexp.MatchString(fmt.Sprintf("%s[[:space:]]+%s[[:space:]]+%d[[:space:]]+%d", testNamespace, containerPodName, inbound, outbound), strings.TrimSpace(stdout)) return match, nil }