From e41efa88d2e176ca1f063d3a4aff6a7c87700917 Mon Sep 17 00:00:00 2001 From: ceclinux Date: Mon, 13 Feb 2023 00:23:07 +0800 Subject: [PATCH] [Multicast] Support removal of stale multicast routes Check the packet count difference every minute for each multicast route and remove the ones that have identical packet count in the past 10 minutes. Signed-off-by: ceclinux --- pkg/agent/multicast/mcast_controller.go | 6 +- pkg/agent/multicast/mcast_route.go | 88 +++++++++-- pkg/agent/multicast/mcast_route_linux.go | 68 +++++++++ pkg/agent/multicast/mcast_route_test.go | 139 +++++++++++++++++- pkg/agent/multicast/mcast_socket_linux.go | 18 ++- pkg/agent/multicast/mcast_socket_others.go | 8 +- pkg/agent/multicast/testing/mock_multicast.go | 15 ++ pkg/agent/util/syscall/linux/types.go | 1 + pkg/agent/util/syscall/syscall_unix.go | 12 +- pkg/agent/util/syscall/ztypes_linux.go | 15 +- 10 files changed, 341 insertions(+), 29 deletions(-) diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 9de45eff092..7c3ecae5efe 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -112,7 +112,6 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) { c.groupCache.Add(status) c.queue.Add(e.group.String()) klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName) - return } // updateGroupMemberStatus updates the group status in groupCache. If a "join" message is sent from an existing member, @@ -161,7 +160,6 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent } } } - return } // checkLastMember sends out a query message on the group to check if there are still members in the group. If no new @@ -285,7 +283,7 @@ func NewMulticastController(ofClient openflow.Client, groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap, enableFlexibleIPAM) + multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, enableFlexibleIPAM) c := &Controller{ ofClient: ofClient, ifaceStore: ifaceStore, @@ -480,7 +478,7 @@ func (c *Controller) syncGroup(groupKey string) error { deleteLocalMulticastGroup := func() error { err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) if err != nil { - klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey) + klog.ErrorS(err, "Failed to delete multicast group", "group", groupKey) return err } klog.InfoS("Removed multicast route entry", "group", status.group) diff --git a/pkg/agent/multicast/mcast_route.go b/pkg/agent/multicast/mcast_route.go index fd5d481549a..aa5c8260ae8 100644 --- a/pkg/agent/multicast/mcast_route.go +++ b/pkg/agent/multicast/mcast_route.go @@ -19,6 +19,7 @@ import ( "fmt" "net" "strings" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" @@ -34,13 +35,14 @@ const ( MulticastRecvBufferSize = 128 ) -func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], encapEnabled bool, flexibleIPAMEnabled bool) *MRouteClient { +func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], flexibleIPAMEnabled bool) *MRouteClient { var m = &MRouteClient{ igmpMsgChan: make(chan []byte, workerCount), nodeConfig: nodeconfig, groupCache: groupCache, inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}), multicastInterfaces: sets.List(multicastInterfaces), + outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}), socket: multicastSocket, flexibleIPAMEnabled: flexibleIPAMEnabled, } @@ -75,6 +77,7 @@ type MRouteClient struct { nodeConfig *config.NodeConfig multicastInterfaces []string inboundRouteCache cache.Indexer + outboundRouteCache cache.Indexer groupCache cache.Indexer socket RouteInterface multicastInterfaceConfigs []multicastInterfaceConfig @@ -152,23 +155,48 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String()) for _, route := range mEntries { entry := route.(*inboundMulticastRouteEntry) - err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif) + err := c.deleteInboundMRoute(entry) if err != nil { return err } - c.inboundRouteCache.Delete(route) } return nil } +func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) error { + err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), mRoute.vif) + if err != nil { + return err + } + c.inboundRouteCache.Delete(mRoute) + return nil +} + +func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) error { + err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), c.internalInterfaceVIF) + if err != nil { + return err + } + c.outboundRouteCache.Delete(mRoute) + return nil +} + // addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces, // allowing multicast srcNode Pods to send multicast traffic to external. -func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) { +func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) error { klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs) - err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs) + err := c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs) if err != nil { return err } + routeEntry := &outboundMulticastRouteEntry{ + multicastRouteEntry: multicastRouteEntry{ + group: group.String(), + src: src.String(), + updatedTime: time.Now(), + }, + } + c.outboundRouteCache.Add(routeEntry) return nil } @@ -181,15 +209,43 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI return err } routeEntry := &inboundMulticastRouteEntry{ - group: group.String(), - src: src.String(), - vif: inboundVIF, + vif: inboundVIF, + multicastRouteEntry: multicastRouteEntry{ + group: group.String(), + src: src.String(), + updatedTime: time.Now(), + }, } c.inboundRouteCache.Add(routeEntry) return nil } +// Field pktCount and updatedTime are used for removing stale multicast routes. +type multicastRouteEntry struct { + group string + src string + pktCount uint32 + updatedTime time.Time +} + +// outboundMulticastRouteEntry encodes the outbound multicast routing entry. +// For example, +// +// type outboundMulticastRouteEntry struct { +// group "226.94.9.9" +// src "10.0.0.55" +// } encodes the multicast route entry from Antrea gateway to multicast interfaces +// +// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces. +// +// The iif is always Antrea gateway and oifs are always outbound interfaces +// so we do not put them in the struct. +type outboundMulticastRouteEntry struct { + multicastRouteEntry +} + // inboundMulticastRouteEntry encodes the inbound multicast routing entry. +// It has extra field vif to represent inbound interface VIF. // For example, // // type inboundMulticastRouteEntry struct { @@ -201,9 +257,8 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI // (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0. // The oif is always Antrea gateway so we do not put it in the struct. type inboundMulticastRouteEntry struct { - group string - src string - vif uint16 + multicastRouteEntry + vif uint16 } func getMulticastInboundEntryKey(obj interface{}) (string, error) { @@ -211,6 +266,11 @@ func getMulticastInboundEntryKey(obj interface{}) (string, error) { return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil } +func getMulticastOutboundEntryKey(obj interface{}) (string, error) { + entry := obj.(*outboundMulticastRouteEntry) + return entry.group + "/" + entry.src, nil +} + func inboundGroupIndexFunc(obj interface{}) ([]string, error) { entry, ok := obj.(*inboundMulticastRouteEntry) if !ok { @@ -277,10 +337,12 @@ type RouteInterface interface { MulticastInterfaceLeaveMgroup(mgroup net.IP, ifaceIP net.IP, ifaceName string) error // AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group), // inbound multicast interface(iif) and outbound multicast interfaces(oifs). - AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error) + AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) error + // GetMroutePacketCount returns the number of routed packets by the multicast route entry. + GetMroutePacketCount(src net.IP, group net.IP) (uint32, error) // DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group), // inbound multicast interface(iif). - DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) + DelMrouteEntry(src net.IP, group net.IP, iif uint16) error // FlushMRoute flushes static multicast routing entries. FlushMRoute() // GetFD returns socket file descriptor. diff --git a/pkg/agent/multicast/mcast_route_linux.go b/pkg/agent/multicast/mcast_route_linux.go index 9695564bee2..1df6e4f958b 100644 --- a/pkg/agent/multicast/mcast_route_linux.go +++ b/pkg/agent/multicast/mcast_route_linux.go @@ -21,12 +21,18 @@ import ( "fmt" "net" "syscall" + "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "antrea.io/antrea/pkg/util/runtime" ) +const ( + mRouteTimeout = time.Minute * 10 +) + // parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change // after linux 5.9 in the igmpmsg struct when parsing vif. Please check // https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e. @@ -78,6 +84,12 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) { } }() + // Check packet count difference every minute for each multicast route and + // remove ones that do not route any packets in past mRouteTimeout. + // The remaining multicast routes' statistics are getting updated by + // this process as well. + go wait.NonSlidingUntil(c.updateMrouteStats, time.Minute, stopCh) + for i := 0; i < int(workerCount); i++ { go c.worker(stopCh) } @@ -85,3 +97,59 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) { c.socket.FlushMRoute() syscall.Close(c.socket.GetFD()) } + +func (c *MRouteClient) updateMulticastRouteStatsEntry(entry *multicastRouteEntry) (isStale bool, newEntry *multicastRouteEntry) { + packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src), net.ParseIP(entry.group)) + if err != nil { + klog.ErrorS(err, "Failed to get packet count for multicast route", "route", entry) + return false, nil + } + packetCountDiff := packetCount - entry.pktCount + klog.V(4).Infof("Multicast route %v routes %d packets in last %s", entry, packetCountDiff, time.Minute) + now := time.Now() + if packetCountDiff == uint32(0) { + return now.Sub(entry.updatedTime) > mRouteTimeout, nil + } + newEntry = &multicastRouteEntry{group: entry.group, src: entry.src, pktCount: packetCount, updatedTime: now} + return false, newEntry +} + +func (c *MRouteClient) updateInboundMrouteStats() { + for _, obj := range c.inboundRouteCache.List() { + entry := obj.(*inboundMulticastRouteEntry) + isStale, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) + if isStale { + klog.V(2).InfoS("Deleting stale inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif) + err := c.deleteInboundMRoute(entry) + if err != nil { + klog.ErrorS(err, "Failed to delete inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif) + } + } else if newEntry != nil { + newInboundEntry := inboundMulticastRouteEntry{*newEntry, entry.vif} + c.inboundRouteCache.Update(&newInboundEntry) + } + } +} + +func (c *MRouteClient) updateOutboundMrouteStats() { + for _, obj := range c.outboundRouteCache.List() { + entry := obj.(*outboundMulticastRouteEntry) + isStale, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) + if isStale { + klog.V(2).InfoS("Deleting stale outbound multicast route", "group", entry.group, "source", entry.src) + err := c.deleteOutboundMRoute(entry) + if err != nil { + klog.ErrorS(err, "Failed to delete outbound multicast route", "group", entry.group, "source", entry.src) + } + } else if newEntry != nil { + newOutboundEntry := outboundMulticastRouteEntry{*newEntry} + c.outboundRouteCache.Update(&newOutboundEntry) + } + } +} + +func (c *MRouteClient) updateMrouteStats() { + klog.V(2).InfoS("Updating multicast route statistics and removing stale multicast routes") + c.updateInboundMrouteStats() + c.updateOutboundMrouteStats() +} diff --git a/pkg/agent/multicast/mcast_route_test.go b/pkg/agent/multicast/mcast_route_test.go index 2a5ffbc38c6..bdcc7a0fad2 100644 --- a/pkg/agent/multicast/mcast_route_test.go +++ b/pkg/agent/multicast/mcast_route_test.go @@ -109,20 +109,20 @@ func TestDeleteInboundMrouteEntryByGroup(t *testing.T) { name: "two entries matched", group: net.ParseIP("224.3.4.5"), currRouteEntries: []inboundMulticastRouteEntry{ - {group: "224.3.4.5", src: "10.3.4.6", vif: 1}, - {group: "224.3.4.5", src: "10.3.4.7", vif: 2}, - {group: "224.3.4.7", src: "10.3.4.7", vif: 2}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.5", src: "10.3.4.6"}, vif: 1}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.5", src: "10.3.4.7"}, vif: 2}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.7", src: "10.3.4.7"}, vif: 2}, }, deletedRouteEntries: []inboundMulticastRouteEntry{ - {group: "224.3.4.5", src: "10.3.4.6", vif: 1}, - {group: "224.3.4.5", src: "10.3.4.7", vif: 2}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.5", src: "10.3.4.6"}, vif: 1}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.5", src: "10.3.4.7"}, vif: 2}, }, }, { name: "no entry match", group: net.ParseIP("224.3.4.6"), currRouteEntries: []inboundMulticastRouteEntry{ - {group: "224.3.4.5", src: "10.3.4.6", vif: 1}, + {multicastRouteEntry: multicastRouteEntry{group: "224.3.4.5", src: "10.3.4.6"}, vif: 1}, }, deletedRouteEntries: []inboundMulticastRouteEntry{}}, } { @@ -139,6 +139,131 @@ func TestDeleteInboundMrouteEntryByGroup(t *testing.T) { } } +func TestUpdateOutboundMrouteStats(t *testing.T) { + mRoute := newMockMulticastRouteClient(t) + err := mRoute.initialize(t) + require.NoError(t, err) + now := time.Now() + for _, m := range []struct { + isStale bool + currStats uint32 + group string + source string + packetCount uint32 + createdTime time.Time + }{ + { + group: "224.3.5.7", + source: "10.1.2.3", + createdTime: now, + isStale: false, + currStats: 0, + }, + { + group: "224.3.5.8", + source: "10.1.2.4", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 10, + isStale: false, + currStats: 9, + }, + { + group: "224.3.5.9", + source: "10.1.2.5", + createdTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 0, + isStale: true, + currStats: 0, + }, + } { + outboundMrouteEntry := &outboundMulticastRouteEntry{ + multicastRouteEntry: multicastRouteEntry{ + src: m.source, + group: m.group, + pktCount: m.packetCount, + updatedTime: m.createdTime, + }, + } + mRoute.outboundRouteCache.Add(outboundMrouteEntry) + mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source), net.ParseIP(m.group)).Times(1).Return(m.currStats, nil) + if m.isStale { + mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source), net.ParseIP(m.group), uint16(0)).Times(1) + } + isStale := m.isStale + defer func() { + _, exist, _ := mRoute.outboundRouteCache.Get(outboundMrouteEntry) + require.Equal(t, exist, !isStale) + }() + } + mRoute.updateMrouteStats() +} + +func TestUpdateInboundMrouteStats(t *testing.T) { + mRoute := newMockMulticastRouteClient(t) + err := mRoute.initialize(t) + require.NoError(t, err) + now := time.Now() + for _, m := range []struct { + isStale bool + currPacketCount uint32 + vif uint16 + group string + source string + packetCount uint32 + updatedTime time.Time + }{ + { + group: "224.3.5.7", + source: "192.168.50.60", + updatedTime: now, + isStale: false, + currPacketCount: 0, + vif: 3, + }, + { + group: "224.3.5.8", + source: "192.168.50.61", + updatedTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 10, + isStale: false, + currPacketCount: 9, + vif: 4, + }, + { + group: "224.3.5.9", + source: "192.168.50.62", + updatedTime: now.Add(time.Duration(-mRouteTimeout)), + packetCount: 5, + isStale: true, + currPacketCount: 5, + vif: 5, + }, + } { + inboundMrouteEntry := &inboundMulticastRouteEntry{ + multicastRouteEntry: multicastRouteEntry{ + src: m.source, + group: m.group, + pktCount: m.packetCount, + updatedTime: m.updatedTime, + }, + vif: m.vif, + } + mRoute.inboundRouteCache.Add(inboundMrouteEntry) + _, exist, _ := mRoute.inboundRouteCache.Get(inboundMrouteEntry) + require.True(t, exist) + mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source), net.ParseIP(m.group)).Times(1).Return(m.currPacketCount, nil) + if m.isStale { + mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source), net.ParseIP(m.group), m.vif).Times(1) + } + isStale := m.isStale + defer func() { + _, exist, _ := mRoute.inboundRouteCache.Get(inboundMrouteEntry) + require.Equal(t, exist, !isStale) + }() + } + mRoute.updateMrouteStats() +} + func TestProcessIGMPNocacheMsg(t *testing.T) { mRoute := newMockMulticastRouteClient(t) err := mRoute.initialize(t) @@ -236,7 +361,7 @@ func newMockMulticastRouteClient(t *testing.T) *MRouteClient { groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ podInterfaceIndex: podInterfaceIndexFunc, }) - return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.New[string](if1.InterfaceName), false, false) + return newRouteClient(nodeConfig, groupCache, mockMulticastSocket, sets.New[string](if1.InterfaceName), false) } func (c *MRouteClient) initialize(t *testing.T) error { diff --git a/pkg/agent/multicast/mcast_socket_linux.go b/pkg/agent/multicast/mcast_socket_linux.go index f8c946b06cb..3ed1ebab274 100644 --- a/pkg/agent/multicast/mcast_socket_linux.go +++ b/pkg/agent/multicast/mcast_socket_linux.go @@ -60,7 +60,23 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs [] return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc) } -func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) { +// GetMroutePacketCount returns the number of routed packets by the multicast route entry. +// The current implementation only supports IPv4 multicast routes. +func (s *Socket) GetMroutePacketCount(src net.IP, group net.IP) (uint32, error) { + srcIP := src.To4() + groupIP := group.To4() + siocSgReq := multicastsyscall.SiocSgReq{ + Src: [4]byte{srcIP[0], srcIP[1], srcIP[2], srcIP[3]}, + Grp: [4]byte{groupIP[0], groupIP[1], groupIP[2], groupIP[3]}, + } + err := multicastsyscall.IoctlGetSiocSgReq(s.GetFD(), &siocSgReq) + if err != nil { + return 0, err + } + return siocSgReq.Pktcnt, nil +} + +func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) error { mc := &multicastsyscall.Mfcctl{} origin := src.To4() mc.Origin = [4]byte{origin[0], origin[1], origin[2], origin[3]} diff --git a/pkg/agent/multicast/mcast_socket_others.go b/pkg/agent/multicast/mcast_socket_others.go index f60848c1f8a..bcadabc4e0d 100644 --- a/pkg/agent/multicast/mcast_socket_others.go +++ b/pkg/agent/multicast/mcast_socket_others.go @@ -27,11 +27,15 @@ const ( SizeofIgmpmsg = 0 ) -func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []uint16) (err error) { +func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []uint16) error { return nil } -func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) { +func (s *Socket) GetMroutePacketCount(src net.IP, group net.IP) (uint32, error) { + return 0, nil +} + +func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) error { return nil } diff --git a/pkg/agent/multicast/testing/mock_multicast.go b/pkg/agent/multicast/testing/mock_multicast.go index 0c13ef9b311..277876ca2da 100644 --- a/pkg/agent/multicast/testing/mock_multicast.go +++ b/pkg/agent/multicast/testing/mock_multicast.go @@ -122,6 +122,21 @@ func (mr *MockRouteInterfaceMockRecorder) GetFD() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFD", reflect.TypeOf((*MockRouteInterface)(nil).GetFD)) } +// GetMroutePacketCount mocks base method. +func (m *MockRouteInterface) GetMroutePacketCount(arg0, arg1 net.IP) (uint32, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMroutePacketCount", arg0, arg1) + ret0, _ := ret[0].(uint32) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMroutePacketCount indicates an expected call of GetMroutePacketCount. +func (mr *MockRouteInterfaceMockRecorder) GetMroutePacketCount(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMroutePacketCount", reflect.TypeOf((*MockRouteInterface)(nil).GetMroutePacketCount), arg0, arg1) +} + // MulticastInterfaceJoinMgroup mocks base method. func (m *MockRouteInterface) MulticastInterfaceJoinMgroup(arg0, arg1 net.IP, arg2 string) error { m.ctrl.T.Helper() diff --git a/pkg/agent/util/syscall/linux/types.go b/pkg/agent/util/syscall/linux/types.go index 883bfb250b0..0f3d6461441 100644 --- a/pkg/agent/util/syscall/linux/types.go +++ b/pkg/agent/util/syscall/linux/types.go @@ -47,6 +47,7 @@ const ( type Mfcctl C.struct_mfcctl type Vifctl C.struct_vifctl_with_ifindex +type SiocSgReq C.struct_siocsgreq const SizeofMfcctl = C.sizeof_struct_mfcctl const SizeofVifctl = C.sizeof_struct_vifctl_with_ifindex diff --git a/pkg/agent/util/syscall/syscall_unix.go b/pkg/agent/util/syscall/syscall_unix.go index 6a1a494f9cd..46c4c4f4d8b 100644 --- a/pkg/agent/util/syscall/syscall_unix.go +++ b/pkg/agent/util/syscall/syscall_unix.go @@ -34,7 +34,13 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr) return } -// Please add your wrapped syscall functions below +func ioctlPtr(fd int, req uint, arg unsafe.Pointer) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg)) + if e1 != 0 { + return e1 + } + return +} func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error { return setsockopt(fd, level, opt, unsafe.Pointer(mfcctl), SizeofMfcctl) @@ -43,3 +49,7 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error { func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error { return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl) } + +func IoctlGetSiocSgReq(fd int, siocsgreq *SiocSgReq) error { + return ioctlPtr(fd, SIOCGETSGCNT, unsafe.Pointer(siocsgreq)) +} diff --git a/pkg/agent/util/syscall/ztypes_linux.go b/pkg/agent/util/syscall/ztypes_linux.go index 2d064ccfe3c..ec443eee2be 100644 --- a/pkg/agent/util/syscall/ztypes_linux.go +++ b/pkg/agent/util/syscall/ztypes_linux.go @@ -26,6 +26,7 @@ const ( MRT_INIT = 0xc8 MRT_FLUSH = 0xd4 MAXVIFS = 0x20 + SIOCGETSGCNT = 0x89e1 ) type Mfcctl struct { @@ -35,7 +36,7 @@ type Mfcctl struct { Ttls [32]uint8 Pkt_cnt uint32 Byte_cnt uint32 - Wrong_if uint32 + Wrong_if uint32 /* number wrong of iif hits */ Expire int32 } @@ -48,6 +49,18 @@ type Vifctl struct { Rmt_addr [4]byte /* in_addr */ } +// SiocSgReq is the Golang version of Linux kernel struct sioc_sg_req. +// Please check https://github.com/torvalds/linux/blob/master/include/uapi/linux/mroute.h#L92. +// The struct encodes the packet count and byte count of a multicast route +// identified by Src(source) and Grp(group). +type SiocSgReq = struct { + Src [4]byte /* in_addr */ + Grp [4]byte /* in_addr */ + Pktcnt uint32 + Bytecnt uint32 + Wrong_if uint32 /* number wrong of iif hits */ +} + const SizeofMfcctl = 0x3c const SizeofVifctl = 0x10 const SizeofIgmpmsg = 0x14