Skip to content

Commit

Permalink
[Multicast] Support removal of staled outbound multicast routes
Browse files Browse the repository at this point in the history
Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Mar 4, 2022
1 parent 871806d commit 7fc2619
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +39,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -72,6 +74,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -165,6 +168,12 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
createdTime: time.Now(),
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand Down Expand Up @@ -200,11 +209,33 @@ type inboundMulticastRouteEntry struct {
vif uint16
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
// Field pktCount and createdTime are used for removing staled multicast routes.
type outboundMulticastRouteEntry struct {
group string
src string
pktCount uint32
createdTime time.Time
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -272,6 +303,8 @@ type RouteInterface interface {
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
// GetoutboundMroutePacketCount returns number of routed by outboundRoute entry.
GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
Expand Down
42 changes: 42 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
outboundMRouteTimeout = time.Second * 60
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,46 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

go wait.NonSlidingUntil(c.updateOutboundMrouteStats, outboundMRouteTimeout, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateOutboundMrouteStats() {
klog.V(2).InfoS("Updating outbound multicast route statistics and removing staled routes")
deletedOutboundRoutes := make([]*outboundMulticastRouteEntry, 0)
now := time.Now()
for _, obj := range c.outboundRouteCache.List() {
outboundRoute, _ := obj.(*outboundMulticastRouteEntry)
if now.Sub(outboundRoute.createdTime) < outboundMRouteTimeout {
continue
}
packetCount, err := c.socket.GetoutboundMroutePacketCount(net.ParseIP(outboundRoute.src), net.ParseIP(outboundRoute.group))
if err != nil {
klog.ErrorS(err, "Failed to get local packet count for outbound multicast route", "outboundRoute", outboundRoute)
return
}
packetCountDiff := packetCount - outboundRoute.pktCount
klog.V(4).Infof("Current mroute stats is %+v, send %d packets in last minute", outboundRoute, packetCountDiff)
if packetCountDiff == uint32(0) {
deletedOutboundRoutes = append(deletedOutboundRoutes, outboundRoute)
} else {
outboundRoute.pktCount = packetCount
c.outboundRouteCache.Update(outboundRoute)
}
}
for _, outboundRoute := range deletedOutboundRoutes {
klog.Infof("Deleting staled outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
err := c.socket.DelMrouteEntry(net.ParseIP(outboundRoute.src), net.ParseIP(outboundRoute.group), c.internalInterfaceVIF)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
return
}
c.outboundRouteCache.Delete(outboundRoute)
}
}
12 changes: 12 additions & 0 deletions pkg/agent/multicast/mcast_socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc)
}

func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
sioqSgReq := multicastsyscall.SiocSgReq{
Src: [4]byte{src[0], src[1], src[2], src[3]},
Grp: [4]byte{group[0], group[1], group[2], group[3]},
}
stats, err := multicastsyscall.IoctlGetiocSgReq(s.GetFD(), &sioqSgReq)
if err != nil {
return 0, err
}
return stats.Pktcnt, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
mc := &multicastsyscall.Mfcctl{}
origin := src.To4()
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/multicast/mcast_socket_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return nil
}

func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
return 0, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/multicast/testing/mock_multicast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/agent/util/syscall/syscall_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package syscall

import (
"runtime"
"syscall"
"unsafe"
)
Expand All @@ -34,6 +35,14 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr)
return
}

func ioctl(fd int, req uint, arg uintptr) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg))
if e1 != 0 {
return e1
}
return
}

// Please add your wrapped syscall functions below

func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
Expand All @@ -43,3 +52,9 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl)
}

func IoctlGetiocSgReq(fd int, value *SiocSgReq) (*SiocSgReq, error) {
err := ioctl(fd, SIOCGETSGCNT, uintptr(unsafe.Pointer(value)))
runtime.KeepAlive(value)
return value, err
}
14 changes: 14 additions & 0 deletions pkg/agent/util/syscall/ztypes_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,17 @@ type Vifctl struct {
const SizeofMfcctl = 0x3c
const SizeofVifctl = 0x10
const SizeofIgmpmsg = 0x14

const (
SIOCGETSGCNT = 0x89e1
)

type SiocSgReq = struct {
Src [4]byte /* in_addr */
Grp [4]byte /* in_addr */
Pktcnt uint32
Bytecnt uint32
If uint32
}

const SizeofSiocSgReq = 0x20

0 comments on commit 7fc2619

Please sign in to comment.