Skip to content

Commit

Permalink
Centralized leaked ENI cleanup- CNINode CRD changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sushrk committed Dec 13, 2024
1 parent 6706a5d commit 6e21841
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 25 deletions.
2 changes: 2 additions & 0 deletions apis/vpcresources/v1alpha1/cninode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Feature struct {
// CNINodeSpec defines the desired state of CNINode
type CNINodeSpec struct {
Features []Feature `json:"features,omitempty"`
// Additional tag key/value added to all network interfaces provisioned by the vpc-resource-controller and VPC-CNI
Tags map[string]string `json:"tags,omitempty"`
}

// CNINodeStatus defines the managed VPC resources.
Expand Down
7 changes: 7 additions & 0 deletions apis/vpcresources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/vpcresources.k8s.aws_cninodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ spec:
type: string
type: object
type: array
tags:
additionalProperties:
type: string
description: Additional tag key/value added to all network interfaces
provisioned by the vpc-resource-controller and VPC-CNI
type: object
type: object
status:
description: CNINodeStatus defines the managed VPC resources.
Expand Down
3 changes: 1 addition & 2 deletions controllers/core/configmap_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -86,7 +85,7 @@ func (r *ConfigMapReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
r.K8sAPI,
utils.BranchENICoolDownUpdateReason,
fmt.Sprintf("Branch ENI cool down period has been updated to %s", cooldown.GetCoolDown().GetCoolDownPeriod()),
v1.EventTypeNormal,
corev1.EventTypeNormal,
r.Log,
)
}
Expand Down
5 changes: 2 additions & 3 deletions controllers/core/configmap_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -225,8 +224,8 @@ func Test_Reconcile_UpdateNode_Error(t *testing.T) {

}

func createCoolDownMockCM(cooldownTime string) *v1.ConfigMap {
return &v1.ConfigMap{
func createCoolDownMockCM(cooldownTime string) *corev1.ConfigMap {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.VpcCniConfigMapName,
Namespace: config.KubeSystemNamespace,
Expand Down
2 changes: 1 addition & 1 deletion controllers/core/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (r *PodReconciler) Reconcile(request custom.Request) (ctrl.Result, error) {
} else {
result, err = resourceHandler.HandleCreate(int(totalCount), pod)
}
if err != nil || result.Requeue == true {
if err != nil || result.Requeue {
return result, err
}
logger.V(1).Info("handled resource without error",
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func main() {
nodeManagerWorkers := asyncWorkers.NewDefaultWorkerPool("node async workers",
nodeWorkerCount, 1, ctrl.Log.WithName("node async workers"), ctx)
nodeManager, err := manager.NewNodeManager(ctrl.Log.WithName("node manager"), resourceManager,
apiWrapper, nodeManagerWorkers, controllerConditions, version.GitVersion, healthzHandler)
apiWrapper, nodeManagerWorkers, controllerConditions, clusterName, version.GitVersion, healthzHandler)

if err != nil {
ctrl.Log.Error(err, "failed to init node manager")
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
OSWindows = "windows"
// OSLinux is the the linux Operating System
OSLinux = "linux"
// Node termination finalizer on CNINode CRD
NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup"
)

// EC2 Tags
Expand All @@ -65,6 +67,7 @@ const (
NetworkInterfaceOwnerTagKey = "eks:eni:owner"
NetworkInterfaceOwnerTagValue = "eks-vpc-resource-controller"
NetworkInterfaceOwnerVPCCNITagValue = "amazon-vpc-cni"
CNINodeClusterNameKey = "cluster.k8s.amazonaws.com/name"
)

const (
Expand Down
84 changes: 84 additions & 0 deletions pkg/k8s/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package k8s

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

type FinalizerManager interface {
AddFinalizers(ctx context.Context, object client.Object, finalizers ...string) error
RemoveFinalizers(ctx context.Context, object client.Object, finalizers ...string) error
}

func NewDefaultFinalizerManager(k8sClient client.Client, log logr.Logger) FinalizerManager {
return &defaultFinalizerManager{
k8sClient: k8sClient,
log: log,
}
}

type defaultFinalizerManager struct {
k8sClient client.Client
log logr.Logger
}

func (m *defaultFinalizerManager) AddFinalizers(ctx context.Context, obj client.Object, finalizers ...string) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := m.k8sClient.Get(ctx, NamespacedName(obj), obj); err != nil {
return err
}

oldObj := obj.DeepCopyObject().(client.Object)
needsUpdate := false
for _, finalizer := range finalizers {
if !controllerutil.ContainsFinalizer(obj, finalizer) {
m.log.Info("adding finalizer", "object", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "finalizer", finalizer)
controllerutil.AddFinalizer(obj, finalizer)
needsUpdate = true
}
}
if !needsUpdate {
return nil
}
return m.k8sClient.Patch(ctx, obj, client.MergeFromWithOptions(oldObj, client.MergeFromWithOptimisticLock{}))
})
}

func (m *defaultFinalizerManager) RemoveFinalizers(ctx context.Context, obj client.Object, finalizers ...string) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := m.k8sClient.Get(ctx, NamespacedName(obj), obj); err != nil {
return err
}

oldObj := obj.DeepCopyObject().(client.Object)
needsUpdate := false
for _, finalizer := range finalizers {
if controllerutil.ContainsFinalizer(obj, finalizer) {
m.log.Info("removing finalizer", "object", obj.GetObjectKind().GroupVersionKind().Kind, "name", obj.GetName(), "finalizer", finalizer)
controllerutil.RemoveFinalizer(obj, finalizer)
needsUpdate = true
}
}
if !needsUpdate {
return nil
}
return m.k8sClient.Patch(ctx, obj, client.MergeFromWithOptions(oldObj, client.MergeFromWithOptimisticLock{}))
})
}
23 changes: 23 additions & 0 deletions pkg/k8s/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package k8s

import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NamespacedName returns the namespaced name for k8s objects
func NamespacedName(obj client.Object) client.ObjectKey {
return client.ObjectKeyFromObject(obj)
}
15 changes: 13 additions & 2 deletions pkg/k8s/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package k8s

import (
"context"
"fmt"
"strconv"

"github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1"
Expand Down Expand Up @@ -80,7 +81,7 @@ type K8sWrapper interface {
AddLabelToManageNode(node *v1.Node, labelKey string, labelValue string) (bool, error)
ListEvents(ops []client.ListOption) (*eventsv1.EventList, error)
GetCNINode(namespacedName types.NamespacedName) (*rcv1alpha1.CNINode, error)
CreateCNINode(node *v1.Node) error
CreateCNINode(node *v1.Node, clusterName string) error
}

// k8sWrapper is the wrapper object with the client
Expand Down Expand Up @@ -233,7 +234,7 @@ func (k *k8sWrapper) GetCNINode(namespacedName types.NamespacedName) (*rcv1alpha
return cninode, nil
}

func (k *k8sWrapper) CreateCNINode(node *v1.Node) error {
func (k *k8sWrapper) CreateCNINode(node *v1.Node, clusterName string) error {
cniNode := &rcv1alpha1.CNINode{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
Expand All @@ -248,6 +249,16 @@ func (k *k8sWrapper) CreateCNINode(node *v1.Node) error {
Controller: lo.ToPtr(true),
},
},
Labels: map[string]string{
// OS is a standard label & is set by Kubernetes, so we can skip checking if it is set
config.NodeLabelOS: node.ObjectMeta.Labels[config.NodeLabelOS],
},
Finalizers: []string{config.NodeTerminationFinalizer}, // finalizer to clean up leaked ENIs at node termination
},
Spec: rcv1alpha1.CNINodeSpec{
Tags: map[string]string{
fmt.Sprintf(config.CNINodeClusterNameKey): clusterName,
},
},
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/k8s/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

var (
nodeName = "node-name"
mockClusterName = "cluster-name"
mockResourceName = config.ResourceNamePodENI

existingResource = "extended-resource"
Expand All @@ -45,6 +46,9 @@ var (
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Labels: map[string]string{
config.NodeLabelOS: config.OSLinux,
},
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Expand Down Expand Up @@ -196,20 +200,20 @@ func TestK8sWrapper_CreateCNINodeWithExistedObject_NoError(t *testing.T) {
ctrl := gomock.NewController(t)
wrapper, _, _ := getMockK8sWrapperWithClient(ctrl, []runtime.Object{mockCNINode})

err := wrapper.CreateCNINode(mockNode)
err := wrapper.CreateCNINode(mockNode, mockClusterName)
assert.NoError(t, err)
cniNode, err := wrapper.GetCNINode(types.NamespacedName{Name: mockNode.Name})
assert.NoError(t, err)
assert.Equal(t, mockNode.Name, cniNode.Name)
err = wrapper.CreateCNINode(mockNode)
err = wrapper.CreateCNINode(mockNode, mockClusterName)
assert.NoError(t, err)
}

func TestK8sWrapper_CreateCNINode_NoError(t *testing.T) {
ctrl := gomock.NewController(t)
wrapper, _, _ := getMockK8sWrapperWithClient(ctrl, []runtime.Object{mockCNINode})

err := wrapper.CreateCNINode(mockNode)
err := wrapper.CreateCNINode(mockNode, mockClusterName)
assert.NoError(t, err)
cniNode, err := wrapper.GetCNINode(types.NamespacedName{Name: mockNode.Name})
assert.NoError(t, err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type manager struct {
conditions condition.Conditions
controllerVersion string
stopHealthCheckAt time.Time
clusterName string
}

// Manager to perform operation on list of managed/un-managed node
Expand Down Expand Up @@ -102,7 +103,7 @@ const pausingHealthCheckDuration = 10 * time.Minute

// NewNodeManager returns a new node manager
func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager,
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {
wrapper api.Wrapper, worker asyncWorker.Worker, conditions condition.Conditions, clusterName string, controllerVersion string, healthzHandler *rcHealthz.HealthzHandler) (Manager, error) {

manager := &manager{
resourceManager: resourceManager,
Expand All @@ -112,6 +113,7 @@ func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager
worker: worker,
conditions: conditions,
controllerVersion: controllerVersion,
clusterName: clusterName,
}

// add health check on subpath for node manager
Expand Down Expand Up @@ -228,7 +230,7 @@ func (m *manager) CreateCNINodeIfNotExisting(node *v1.Node) error {
); err != nil {
if apierrors.IsNotFound(err) {
m.Log.Info("Will create a new CNINode", "CNINodeName", node.Name)
return m.wrapper.K8sAPI.CreateCNINode(node)
return m.wrapper.K8sAPI.CreateCNINode(node, m.clusterName)
}
return err
} else {
Expand Down Expand Up @@ -459,7 +461,7 @@ func (m *manager) performAsyncOperation(job interface{}) (ctrl.Result, error) {
log.V(1).Info("successfully performed node operation")
return ctrl.Result{}, nil
}
log.Error(err, "failed to performed node operation")
log.Error(err, "failed to perform node operation")

return ctrl.Result{}, nil
}
Expand Down
Loading

0 comments on commit 6e21841

Please sign in to comment.