Skip to content

Commit

Permalink
Improve Egress API visibility
Browse files Browse the repository at this point in the history
Record event when EgressIP is assigned to the Node interface.

Signed-off-by: Pulkit Jain <[email protected]>
  • Loading branch information
Pulkit Jain committed Dec 20, 2023
1 parent faec22a commit f20c9b2
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 17 deletions.
6 changes: 6 additions & 0 deletions build/charts/antrea/templates/agent/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,9 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
6 changes: 6 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6269,6 +6269,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6256,6 +6256,12 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- create
---
# Source: antrea/templates/antctl/clusterrole.yaml
kind: ClusterRole
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func run(o *Options) error {
}
if o.enableEgress {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
k8sClient, ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode,
features.DefaultFeatureGate.Enabled(features.EgressTrafficShaping),
)
Expand Down
30 changes: 29 additions & 1 deletion pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
Expand All @@ -49,6 +54,7 @@ import (
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/client/clientset/versioned/scheme"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1beta1"
"antrea.io/antrea/pkg/controller/metrics"
Expand Down Expand Up @@ -175,9 +181,12 @@ type EgressController struct {
serviceCIDRUpdateRetryDelay time.Duration

trafficShapingEnabled bool

record record.EventRecorder
}

func NewEgressController(
client kubernetes.Interface,
ofClient openflow.Client,
antreaClientGetter agent.AntreaClientProvider,
crdClient clientsetversioned.Interface,
Expand All @@ -196,6 +205,18 @@ func NewEgressController(
if trafficShapingEnabled && !openflow.OVSMetersAreSupported() {
klog.Info("EgressTrafficShaping feature gate is enabled, but it is ignored because OVS meters are not supported.")
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
runtime.Must(scheme.AddToScheme(k8sscheme.Scheme))
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: client.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(
k8sscheme.Scheme,
corev1.EventSource{Component: "egress-controller"},
)

c := &EgressController{
ofClient: ofClient,
routeClient: routeClient,
Expand All @@ -220,6 +241,8 @@ func NewEgressController(
serviceCIDRUpdateRetryDelay: 10 * time.Second,

trafficShapingEnabled: openflow.OVSMetersAreSupported() && trafficShapingEnabled,

record: recorder,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -745,6 +768,7 @@ func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP s
Message: fmt.Sprintf("Failed to assign the IP to EgressNode: %v", scheduleErr),
},
}
c.record.Eventf(egress, corev1.EventTypeNormal, "IPUnassigned", "Unassigned egress %s with IP %s", egress.Name, egress.Spec.EgressIP)
}
} else {
// The Egress IP is assigned to a Node (egressIP != "") but it's not this Node (isLocal == false), do nothing.
Expand Down Expand Up @@ -848,9 +872,13 @@ func (c *EgressController) syncEgress(egressName string) error {
// Ensure the Egress IP is assigned to the system. Force advertising the IP if it was previously assigned to
// another Node in the Egress API. This could force refreshing other peers' neighbor cache when the Egress IP is
// obtained by this Node and another Node at the same time in some situations, e.g. split brain.
if err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName); err != nil {
exists, err := c.ipAssigner.AssignIP(desiredEgressIP, egress.Status.EgressNode != c.nodeName)
if err != nil {
return err
}
if exists {
c.record.Eventf(egress, corev1.EventTypeNormal, "IPAssigned", "Assigned egress %s with IP %s on node %s", egress.Name, desiredEgressIP, desiredNode)
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller)
mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any())
egressController, _ := NewEgressController(mockOFClient,
egressController, _ := NewEgressController(k8sClient, mockOFClient,
&antreaClientGetter{clientset},
crdClient,
ifaceStore,
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/serviceexternalip/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *ServiceExternalIPController) assignIP(ip string, service apimachineryty
c.assignedIPsMutex.Lock()
defer c.assignedIPsMutex.Unlock()
if _, ok := c.assignedIPs[ip]; !ok {
if err := c.ipAssigner.AssignIP(ip, true); err != nil {
if _, err := c.ipAssigner.AssignIP(ip, true); err != nil {
return err
}
c.assignedIPs[ip] = sets.New[string](service.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/ipassigner/ip_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "k8s.io/apimachinery/pkg/util/sets"
// IPAssigner provides methods to assign or unassign IP.
type IPAssigner interface {
// AssignIP ensures the provided IP is assigned to the system.
AssignIP(ip string, forceAdvertise bool) error
AssignIP(ip string, forceAdvertise bool) (bool, error)
// UnassignIP ensures the provided IP is not assigned to the system.
UnassignIP(ip string) error
// AssignedIPs return the IPs that are assigned to the system by this IPAssigner.
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ func (a *ipAssigner) loadIPAddresses() (sets.Set[string], error) {
}

// AssignIP ensures the provided IP is assigned to the dummy device and the ARP/NDP responders.
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) (bool, error) {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return fmt.Errorf("invalid IP %s", ip)
return false, fmt.Errorf("invalid IP %s", ip)
}
a.mutex.Lock()
defer a.mutex.Unlock()
Expand All @@ -156,14 +156,14 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {
if forceAdvertise {
a.advertise(parsedIP)
}
return nil
return false, nil
}

if a.dummyDevice != nil {
addr := util.NewIPNet(parsedIP)
if err := netlink.AddrAdd(a.dummyDevice, &netlink.Addr{IPNet: addr}); err != nil {
if !errors.Is(err, unix.EEXIST) {
return fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
return false, fmt.Errorf("failed to add IP %v to interface %s: %v", ip, a.dummyDevice.Attrs().Name, err)
} else {
klog.InfoS("IP was already assigned to interface", "ip", parsedIP, "interface", a.dummyDevice.Attrs().Name)
}
Expand All @@ -174,18 +174,18 @@ func (a *ipAssigner) AssignIP(ip string, forceAdvertise bool) error {

if utilnet.IsIPv4(parsedIP) && a.arpResponder != nil {
if err := a.arpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to ARP responder: %v", ip, err)
}
}
if utilnet.IsIPv6(parsedIP) && a.ndpResponder != nil {
if err := a.ndpResponder.AddIP(parsedIP); err != nil {
return fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
return false, fmt.Errorf("failed to assign IP %v to NDP responder: %v", ip, err)
}
}
// Always advertise the IP when the IP is newly assigned to this Node.
a.advertise(parsedIP)
a.assignedIPs.Insert(ip)
return nil
return true, nil
}

func (a *ipAssigner) advertise(ip net.IP) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/ipassigner/testing/mock_ipassigner.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions test/e2e/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,17 @@ func testEgressCRUD(t *testing.T, data *TestData) {
return true, nil
})
require.NoError(t, err, "Expected egressIP=%s nodeName in %s, got egressIP=%s nodeName=%s", tt.expectedEgressIP, sets.List(tt.expectedNodes), egress.Spec.EgressIP, egress.Status.EgressNode)

if egress.Status.EgressNode != "" {
exists, err := hasIP(data, egress.Status.EgressNode, egress.Spec.EgressIP)
require.NoError(t, err, "Failed to check if IP exists on Node")
assert.True(t, exists, "Didn't find desired IP on Node")
// Testing the events recorded during creation of an Egress resource.
expectedMessage := fmt.Sprintf("Assigned egress %s with IP %s on node", egress.Name, tt.expectedEgressIP)
fieldSelector := fmt.Sprintf("involvedObject.name=%s", egress.Name)
events, err := data.clientset.CoreV1().Events("").List(context.TODO(), metav1.ListOptions{FieldSelector: fieldSelector})
require.NoError(t, err)
assert.Contains(t, events.Items[0].Message, expectedMessage)
}

checkEIPStatus := func(expectedUsed int) {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/agent/ip_assigner_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestIPAssigner(t *testing.T) {
require.NoError(t, err, "Failed to find the dummy device")
defer netlink.LinkDel(dummyDevice)

err = ipAssigner.AssignIP("x", false)
_, err = ipAssigner.AssignIP("x", false)
assert.Error(t, err, "Assigning an invalid IP should fail")

ip1 := "10.10.10.10"
Expand All @@ -49,7 +49,7 @@ func TestIPAssigner(t *testing.T) {
desiredIPs := sets.New[string](ip1, ip2, ip3)

for ip := range desiredIPs {
errAssign := ipAssigner.AssignIP(ip, false)
_, errAssign := ipAssigner.AssignIP(ip, false)
cmd := exec.Command("ip", "addr")
out, err := cmd.CombinedOutput()
if err != nil {
Expand Down

0 comments on commit f20c9b2

Please sign in to comment.