Skip to content

Commit

Permalink
Refactor multicast NetworkPolicy controller interface (antrea-io#3901)
Browse files Browse the repository at this point in the history
Rename interface MulticastValidator to McastNetworkPolicyController, func Validate() to GetIGMPNPRuleInfo, and struct McastNPValidationItem to IGMPNPRuleInfo.

Signed-off-by: Bin Liu <[email protected]>
  • Loading branch information
liu4480 authored Jun 22, 2022
1 parent f12f150 commit 810488a
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 127 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func run(o *Options) error {
if err != nil {
return fmt.Errorf("failed to create multicast socket")
}
var validator agenttypes.MulticastValidator
var validator agenttypes.McastNetworkPolicyController
if antreaPolicyEnabled {
validator = networkPolicyController
}
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function generate_mocks {
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing"
"pkg/agent/interfacestore InterfaceStore testing"
"pkg/agent/multicast RouteInterface testing"
"pkg/agent/types MulticastValidator testing"
"pkg/agent/types McastNetworkPolicyController testing"
"pkg/agent/nodeportlocal/portcache LocalPortOpener testing"
"pkg/agent/nodeportlocal/rules PodPortRules testing"
"pkg/agent/openflow Client,OFEntryOperations testing"
Expand Down
28 changes: 12 additions & 16 deletions pkg/agent/controller/networkpolicy/networkpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/workqueue"
Expand All @@ -38,7 +37,6 @@ import (
proxytypes "antrea.io/antrea/pkg/agent/proxy/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
)
Expand Down Expand Up @@ -493,18 +491,17 @@ func (c *Controller) matchIGMPType(r *rule, igmpType uint8, groupAddress string)
return false
}

// Validate checks if there is rule to drop or allow IGMP report from a Pod to a group Address, and returns multicast
// NetworkPolicy Information
func (c *Controller) Validate(podName, podNamespace string, groupAddress net.IP, igmpType uint8) (types.McastNPValidationItem, error) {
var ruleTypePtr *v1beta2.NetworkPolicyType
action, uuid, ruleName := v1alpha1.RuleActionAllow, apitypes.UID(""), ""
// GetIGMPNPRuleInfo looks up the IGMP NetworkPolicy rule that matches the given Pod and groupAddress,
// and returns the rule information if found.
func (c *Controller) GetIGMPNPRuleInfo(podName, podNamespace string, groupAddress net.IP, igmpType uint8) (*types.IGMPNPRuleInfo, error) {
member := &v1beta2.GroupMember{
Pod: &v1beta2.PodReference{
Name: podName,
Namespace: podNamespace,
},
}

var ruleInfo *types.IGMPNPRuleInfo
objects, _ := c.ruleCache.rules.ByIndex(toIGMPReportGroupAddressIndex, groupAddress.String())
objects2, _ := c.ruleCache.rules.ByIndex(toIGMPReportGroupAddressIndex, "")
objects = append(objects, objects2...)
Expand All @@ -522,15 +519,14 @@ func (c *Controller) Validate(podName, podNamespace string, groupAddress net.IP,
}

if matchedRule != nil {
ruleTypePtr = new(v1beta2.NetworkPolicyType)
action, uuid, *ruleTypePtr, ruleName = *matchedRule.Action, matchedRule.PolicyUID, matchedRule.SourceRef.Type, matchedRule.Name
}
return types.McastNPValidationItem{
RuleAction: action,
UUID: uuid,
NPType: ruleTypePtr,
Name: ruleName,
}, nil
ruleInfo = &types.IGMPNPRuleInfo{
RuleAction: *matchedRule.Action,
UUID: matchedRule.PolicyUID,
NPType: &matchedRule.SourceRef.Type,
Name: matchedRule.Name,
}
}
return ruleInfo, nil
}

func (c *Controller) enqueueRule(ruleID string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,14 +677,14 @@ func TestValidate(t *testing.T) {
controller.ruleCache.appliedToSetByGroup["appliedToGroup01"] = groups
controller.ruleCache.rules.Add(rule1)
controller.ruleCache.rules.Add(rule2)
item, err := controller.Validate("pod1", "ns1", net.ParseIP(groupAddress1), 0x12)
item, err := controller.GetIGMPNPRuleInfo("pod1", "ns1", net.ParseIP(groupAddress1), 0x12)
if err != nil {
t.Fatalf("failed to validate group %s %v", groupAddress1, err)
}
if item.RuleAction != v1alpha1.RuleActionAllow {
t.Fatalf("groupAddress %s expect %v, but got %v", groupAddress1, v1alpha1.RuleActionAllow, item.RuleAction)
}
item, err = controller.Validate("pod1", "ns1", net.ParseIP(groupAddress2), 0x12)
item, err = controller.GetIGMPNPRuleInfo("pod1", "ns1", net.ParseIP(groupAddress2), 0x12)
if err != nil {
t.Fatalf("failed to validate group %s %+v", groupAddress2, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func NewMulticastController(ofClient openflow.Client,
ovsBridgeClient ovsconfig.OVSBridgeClient,
podUpdateSubscriber channel.Subscriber,
igmpQueryInterval time.Duration,
validator types.MulticastValidator) *Controller {
validator types.McastNetworkPolicyController) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, validator)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand Down
90 changes: 20 additions & 70 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
mockOFClient *openflowtest.MockClient
mockMulticastSocket *multicasttest.MockRouteInterface
mockIfaceStore *ifaceStoretest.MockInterfaceStore
mockMulticastValidator *typestest.MockMulticastValidator
mockMulticastValidator *typestest.MockMcastNetworkPolicyController
ovsClient *ovsconfigtest.MockOVSBridgeClient
if1 = &interfacestore.InterfaceConfig{
Type: interfacestore.ContainerInterface,
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestProcessPacketIn(t *testing.T) {
iface *interfacestore.InterfaceConfig
version uint8
joinedGroups sets.String
joinedGroupItems map[string]types.McastNPValidationItem
joinedGroupItems map[string]*types.IGMPNPRuleInfo
leftGroups sets.String
igmpANPStats map[apitypes.UID]map[string]*types.RuleMetric
igmpACNPStats map[apitypes.UID]map[string]*types.RuleMetric
Expand All @@ -315,19 +315,9 @@ func TestProcessPacketIn(t *testing.T) {
{
iface: createInterface("p1", 1),
joinedGroups: sets.NewString("224.1.101.2", "224.1.101.3", "224.1.101.4"),
joinedGroupItems: map[string]types.McastNPValidationItem{
"224.1.101.2": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.101.3": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
joinedGroupItems: map[string]*types.IGMPNPRuleInfo{
"224.1.101.2": nil,
"224.1.101.3": nil,
"224.1.101.4": {
RuleAction: allow,
UUID: "anp",
Expand All @@ -344,19 +334,14 @@ func TestProcessPacketIn(t *testing.T) {
{
iface: createInterface("p11", 1),
joinedGroups: sets.NewString("224.1.101.20", "224.1.101.21", "224.1.101.22", "224.1.101.23"),
joinedGroupItems: map[string]types.McastNPValidationItem{
joinedGroupItems: map[string]*types.IGMPNPRuleInfo{
"224.1.101.20": {
RuleAction: drop,
UUID: "anp",
NPType: &anp,
Name: "block10120",
},
"224.1.101.2": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.101.2": nil,
"224.1.101.22": {
RuleAction: allow,
UUID: "anp",
Expand All @@ -378,25 +363,10 @@ func TestProcessPacketIn(t *testing.T) {
{
iface: createInterface("p2", 2),
joinedGroups: sets.NewString("224.1.102.2", "224.1.102.3", "224.1.102.4"),
joinedGroupItems: map[string]types.McastNPValidationItem{
"224.1.102.2": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.102.3": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.102.4": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
joinedGroupItems: map[string]*types.IGMPNPRuleInfo{
"224.1.102.2": nil,
"224.1.102.3": nil,
"224.1.102.4": nil,
},
leftGroups: sets.NewString("224.1.102.3"),
igmpANPStats: map[apitypes.UID]map[string]*types.RuleMetric{"anp": {"allow_10122": {Bytes: 42, Packets: 1}, "block10120": {Bytes: 42, Packets: 1}, "allow_1014": {Bytes: 42, Packets: 1}}},
Expand All @@ -407,25 +377,10 @@ func TestProcessPacketIn(t *testing.T) {
{
iface: createInterface("p22", 2),
joinedGroups: sets.NewString("224.1.102.21", "224.1.102.22", "224.1.102.23", "224.1.102.24"),
joinedGroupItems: map[string]types.McastNPValidationItem{
"224.1.102.21": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.102.22": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
"224.1.102.23": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
joinedGroupItems: map[string]*types.IGMPNPRuleInfo{
"224.1.102.21": nil,
"224.1.102.22": nil,
"224.1.102.23": nil,
"224.1.102.24": {
RuleAction: drop,
UUID: "anp",
Expand All @@ -442,13 +397,8 @@ func TestProcessPacketIn(t *testing.T) {
{
iface: createInterface("p33", 3),
joinedGroups: sets.NewString("224.1.103.2", "224.1.103.3", "224.1.103.4"),
joinedGroupItems: map[string]types.McastNPValidationItem{
"224.1.103.2": {
RuleAction: allow,
UUID: "",
NPType: nil,
Name: "",
},
joinedGroupItems: map[string]*types.IGMPNPRuleInfo{
"224.1.103.2": nil,
"224.1.103.3": {
RuleAction: drop,
UUID: "acnp2",
Expand All @@ -475,11 +425,11 @@ func TestProcessPacketIn(t *testing.T) {

if tc.version == 3 {
for _, leftGroup := range tc.leftGroups.List() {
mockMulticastValidator.EXPECT().Validate(tc.iface.InterfaceName, tc.iface.PodNamespace, net.ParseIP(leftGroup).To4(), gomock.Any()).Times(1)
mockMulticastValidator.EXPECT().GetIGMPNPRuleInfo(tc.iface.InterfaceName, tc.iface.PodNamespace, net.ParseIP(leftGroup).To4(), gomock.Any()).Times(1)
}
}
for _, joinedGroup := range tc.joinedGroups.List() {
mockMulticastValidator.EXPECT().Validate(tc.iface.InterfaceName, tc.iface.PodNamespace, net.ParseIP(joinedGroup).To4(), gomock.Any()).Return(tc.joinedGroupItems[joinedGroup], nil).Times(1)
mockMulticastValidator.EXPECT().GetIGMPNPRuleInfo(tc.iface.InterfaceName, tc.iface.PodNamespace, net.ParseIP(joinedGroup).To4(), gomock.Any()).Return(tc.joinedGroupItems[joinedGroup], nil).Times(1)
}
for _, pkt := range packets {
mockIfaceStore.EXPECT().GetInterfaceByOFPort(uint32(tc.iface.OFPort)).Return(tc.iface, true)
Expand Down Expand Up @@ -522,7 +472,7 @@ func newMockMulticastController(t *testing.T) *Controller {
mockOFClient = openflowtest.NewMockClient(controller)
mockIfaceStore = ifaceStoretest.NewMockInterfaceStore(controller)
mockMulticastSocket = multicasttest.NewMockRouteInterface(controller)
mockMulticastValidator = typestest.NewMockMulticastValidator(controller)
mockMulticastValidator = typestest.NewMockMcastNetworkPolicyController(controller)
ovsClient = ovsconfigtest.NewMockOVSBridgeClient(controller)
addr := &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}
nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{Name: "antrea-gw0"}, NodeIPv4Addr: addr}
Expand Down
24 changes: 13 additions & 11 deletions pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type IGMPSnooper struct {
ofClient openflow.Client
ifaceStore interfacestore.InterfaceStore
eventCh chan *mcastGroupEvent
validator types.MulticastValidator
validator types.McastNetworkPolicyController
queryInterval time.Duration
// igmpReportANPStats is a map that saves AntreaNetworkPolicyStats of IGMP report packets.
// The map can be interpreted as
Expand Down Expand Up @@ -130,20 +130,22 @@ func (s *IGMPSnooper) validate(event *mcastGroupEvent, igmpType uint8, packetInD
if event.iface.Type != interfacestore.ContainerInterface {
return true, fmt.Errorf("interface is not container")
}
// Validate checks if packet should be dropped or not, and returns multicast NP information
item, err := s.validator.Validate(event.iface.PodName, event.iface.PodNamespace, event.group, igmpType)

ruleInfo, err := s.validator.GetIGMPNPRuleInfo(event.iface.PodName, event.iface.PodNamespace, event.group, igmpType)
if err != nil {
// It shall drop the packet if function Validate returns error
klog.ErrorS(err, "Failed to validate multicast group event")
return false, err
}
klog.V(2).InfoS("Got NetworkPolicy action for IGMP report", "RuleAction", item.RuleAction, "uuid", item.UUID, "Name", item.Name)
if item.NPType != nil {
s.addToIGMPReportNPStatsMap(item, uint64(packetInData.Len()))
}
if item.RuleAction == v1alpha1.RuleActionDrop {
return false, nil

if ruleInfo != nil {
klog.V(2).InfoS("Got NetworkPolicy action for IGMP report", "RuleAction", ruleInfo.RuleAction, "uuid", ruleInfo.UUID, "Name", ruleInfo.Name)
s.addToIGMPReportNPStatsMap(*ruleInfo, uint64(packetInData.Len()))
if ruleInfo.RuleAction == v1alpha1.RuleActionDrop {
return false, nil
}
}

return true, nil
}

Expand All @@ -162,7 +164,7 @@ func (s *IGMPSnooper) validatePacketAndNotify(event *mcastGroupEvent, igmpType u
s.eventCh <- event
}

func (s *IGMPSnooper) addToIGMPReportNPStatsMap(item types.McastNPValidationItem, packetLen uint64) {
func (s *IGMPSnooper) addToIGMPReportNPStatsMap(item types.IGMPNPRuleInfo, packetLen uint64) {
updateRuleStats := func(igmpReportStatsMap map[apitypes.UID]map[string]*types.RuleMetric, uuid apitypes.UID, name string) {
if igmpReportStatsMap[uuid] == nil {
igmpReportStatsMap[uuid] = make(map[string]*types.RuleMetric)
Expand Down Expand Up @@ -330,7 +332,7 @@ func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) {
}
}

func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.MulticastValidator) *IGMPSnooper {
func newSnooper(ofClient openflow.Client, ifaceStore interfacestore.InterfaceStore, eventCh chan *mcastGroupEvent, queryInterval time.Duration, multicastValidator types.McastNetworkPolicyController) *IGMPSnooper {
snooper := &IGMPSnooper{ofClient: ofClient, ifaceStore: ifaceStore, eventCh: eventCh, validator: multicastValidator, queryInterval: queryInterval}
snooper.igmpReportACNPStats = make(map[apitypes.UID]map[string]*types.RuleMetric)
snooper.igmpReportANPStats = make(map[apitypes.UID]map[string]*types.RuleMetric)
Expand Down
11 changes: 5 additions & 6 deletions pkg/agent/types/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

type McastNPValidationItem struct {
type IGMPNPRuleInfo struct {
RuleAction v1alpha1.RuleAction
UUID apitypes.UID
NPType *v1beta2.NetworkPolicyType
Expand All @@ -35,9 +35,8 @@ var (
_, McastCIDR, _ = net.ParseCIDR("224.0.0.0/4")
)

type MulticastValidator interface {
// Validate checks whether IGMP report from Pod(podNamespace/podName) to groupAddress should be dropped,
// and returns multicast NetworkPolicy information.
// TODO: refacor the function name and return type here
Validate(podname, podNamespace string, groupAddress net.IP, igmpType uint8) (McastNPValidationItem, error)
type McastNetworkPolicyController interface {
// GetIGMPNPRuleInfo looks up the IGMP NetworkPolicy rule that matches the given Pod and groupAddress,
// and returns the rule information if found.
GetIGMPNPRuleInfo(podname, podNamespace string, groupAddress net.IP, igmpType uint8) (*IGMPNPRuleInfo, error)
}
Loading

0 comments on commit 810488a

Please sign in to comment.