Skip to content

Commit

Permalink
[Multicast] Support removal of staled outbound multicast routes
Browse files Browse the repository at this point in the history
Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Feb 21, 2022
1 parent 871806d commit 59cc317
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +39,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -72,6 +74,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -165,6 +168,12 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
createdTime: time.Now(),
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand Down Expand Up @@ -200,11 +209,33 @@ type inboundMulticastRouteEntry struct {
vif uint16
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// vifs vifs of wlan0, ens220
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: wlan0, ens220.
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
type outboundMulticastRouteEntry struct {
group string
src string
pktCount uint32
createdTime time.Time
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -272,6 +303,7 @@ type RouteInterface interface {
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
GetoutboundMroutePackets(outboundRoute *outboundMulticastRouteEntry) (pktCount uint32, err error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
Expand Down
41 changes: 41 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
outboundMRouteTimeout = time.Second * 60
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,45 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

go wait.NonSlidingUntil(c.clearStaleOutboundRoutes, outboundMRouteTimeout, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) clearStaleOutboundRoutes() {
klog.V(2).Infof("Updating outbound multicast route statistics and removing staled routes")
deletedOutboundRoutes := []*outboundMulticastRouteEntry{}
now := time.Now()
for _, outboundRoute := range c.outboundRouteCache.List() {
outboundRoute, _ := outboundRoute.(*outboundMulticastRouteEntry)
if now.Sub(outboundRoute.createdTime) < outboundMRouteTimeout {
continue
}
packetCount, err := c.socket.GetoutboundMroutePackets(outboundRoute)
if err != nil {
klog.Infof("%+v", err)
return
}
packetCountDiff := packetCount - outboundRoute.pktCount
klog.V(4).Infof("Current mroute stats is %+v, send %d packets in last minute", outboundRoute, packetCountDiff)
if packetCountDiff == uint32(0) {
deletedOutboundRoutes = append(deletedOutboundRoutes, outboundRoute)
}
outboundRoute.pktCount = packetCount
c.outboundRouteCache.Update(outboundRoute)
}
for _, route := range deletedOutboundRoutes {
klog.Infof("Deleting staled mroute stats %+v")
err := c.socket.DelMrouteEntry(net.ParseIP(route.src), net.ParseIP(route.group), c.internalInterfaceVIF)
if err != nil {
klog.Infof(err.Error())
return
}
c.outboundRouteCache.Delete(route)
}
}
14 changes: 14 additions & 0 deletions pkg/agent/multicast/mcast_socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc)
}

func (s *Socket) GetoutboundMroutePackets(outboundRoute *outboundMulticastRouteEntry) (pktCount uint32, err error) {
srcIP := net.ParseIP(outboundRoute.src).To4()
groupIP := net.ParseIP(outboundRoute.group).To4()
sioqSgReq := multicastsyscall.SiocSgReq{
Src: [4]byte{srcIP[0], srcIP[1], srcIP[2], srcIP[3]},
Grp: [4]byte{groupIP[0], groupIP[1], groupIP[2], groupIP[3]},
}
stats, err := multicastsyscall.IoctlGetiocSgReq(s.GetFD(), &sioqSgReq)
if err != nil {
return 0, err
}
return stats.Pktcnt, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
mc := &multicastsyscall.Mfcctl{}
origin := src.To4()
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/multicast/testing/mock_multicast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/agent/util/syscall/syscall_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package syscall

import (
"runtime"
"syscall"
"unsafe"
)
Expand All @@ -34,6 +35,14 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr)
return
}

func ioctl(fd int, req uint, arg uintptr) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg))
if e1 != 0 {
return e1
}
return
}

// Please add your wrapped syscall functions below

func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
Expand All @@ -43,3 +52,9 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl)
}

func IoctlGetiocSgReq(fd int, value *SiocSgReq) (*SiocSgReq, error) {
err := ioctl(fd, SIOCGETSGCNT, uintptr(unsafe.Pointer(value)))
runtime.KeepAlive(value)
return value, err
}
14 changes: 14 additions & 0 deletions pkg/agent/util/syscall/ztypes_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ type Vifctl struct {
const SizeofMfcctl = 0x3c
const SizeofVifctl = 0x10
const SizeofIgmpmsg = 0x14

const (
SIOCGETSGCNT = 0x89e1
)

type SiocSgReq = struct {
Src [4]byte /* in_addr */
Grp [4]byte /* in_addr */
Pktcnt uint32
Bytecnt uint32
If uint32
}

const SizeofSiocSgReq = 0x20

0 comments on commit 59cc317

Please sign in to comment.