Skip to content

Commit

Permalink
fix gc controller not handle empty cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Zhiwei Yin <[email protected]>
  • Loading branch information
zhiweiyin318 committed Jan 20, 2025
1 parent f03b3f7 commit 88df560
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 148 deletions.
85 changes: 57 additions & 28 deletions pkg/registration/hub/gc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
operatorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
"k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/metadata"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -47,21 +48,22 @@ var (
// gcReconciler is an interface for reconcile cleanup logic after cluster is deleted.
// clusterName is from the queueKey, cluster may be nil.
type gcReconciler interface {
reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (gcReconcileOp, error)
reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster, clusterNamespace *corev1.Namespace) (gcReconcileOp, error)
}

type GCController struct {
clusterLister clusterv1listers.ManagedClusterLister
clusterPatcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
gcReconcilers []gcReconciler
clusterLister clusterv1listers.ManagedClusterLister
namespaceLister corelisters.NamespaceLister
clusterPatcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
gcReconcilers []gcReconciler
}

// NewGCController ensures the related resources are cleaned up after cluster is deleted
func NewGCController(
clusterRoleLister rbacv1listers.ClusterRoleLister,
clusterRoleBindingLister rbacv1listers.ClusterRoleBindingLister,
roleBindingLister rbacv1listers.RoleBindingLister,
clusterInformer informerv1.ManagedClusterInformer,
namespaceLister corelisters.NamespaceLister,
manifestWorkLister worklister.ManifestWorkLister,
clusterClient clientset.Interface,
kubeClient kubernetes.Interface,
Expand All @@ -76,9 +78,10 @@ func NewGCController(
clusterClient.ClusterV1().ManagedClusters())

controller := &GCController{
clusterLister: clusterInformer.Lister(),
clusterPatcher: clusterPatcher,
gcReconcilers: []gcReconciler{},
clusterLister: clusterInformer.Lister(),
namespaceLister: namespaceLister,
clusterPatcher: clusterPatcher,
gcReconcilers: []gcReconciler{},
}

// do not clean resources if featureGate is disabled or no gc resource list for backwards compatible.
Expand All @@ -98,9 +101,8 @@ func NewGCController(
}

controller.gcReconcilers = append(controller.gcReconcilers,
newGCClusterRbacController(kubeClient, clusterPatcher, clusterInformer, clusterRoleLister,
clusterRoleBindingLister, roleBindingLister, manifestWorkLister, approver, eventRecorder,
resourceCleanupFeatureGateEnable))
newGCClusterRbacController(kubeClient, clusterPatcher, clusterRoleLister, roleBindingLister,
manifestWorkLister, approver, eventRecorder, resourceCleanupFeatureGateEnable))

return factory.New().
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, clusterInformer.Informer()).
Expand All @@ -109,26 +111,43 @@ func NewGCController(

// gc controller is watching cluster and to do these jobs:
// 1. add a cleanup finalizer to managedCluster if the cluster is not deleting.
// 2. clean up all rbac and resources in the cluster ns after the cluster is deleted.
// 2. clean up all rolebinding and resources in the cluster ns after the cluster is deleted.
func (r *GCController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
clusterName := controllerContext.QueueKey()
if clusterName == "" || clusterName == factory.DefaultQueueKey {
return nil
}

originalCluster, err := r.clusterLister.Get(clusterName)
switch {
case errors.IsNotFound(err):
return nil
case err != nil:
// cluster could be nil, that means the cluster is gone but the gc is not finished.
cluster, err := r.clusterLister.Get(clusterName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}

if cluster != nil && cluster.DeletionTimestamp.IsZero() {
_, err := r.clusterPatcher.AddFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer)
return err
}

cluster := originalCluster.DeepCopy()
// clusterNamespace could be nil, that means there is no resources which are waiting for cleanup.
clusterNamespace, err := r.namespaceLister.Get(clusterName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}

if cluster == nil && clusterNamespace == nil {
return nil
}

var copyCluster *clusterv1.ManagedCluster
if cluster != nil {
copyCluster = cluster.DeepCopy()
}

var errs []error
var requeue bool
for _, reconciler := range r.gcReconcilers {
op, err := reconciler.reconcile(ctx, cluster)
op, err := reconciler.reconcile(ctx, copyCluster, clusterNamespace)
if err != nil {
errs = append(errs, err)
}
Expand All @@ -139,23 +158,33 @@ func (r *GCController) sync(ctx context.Context, controllerContext factory.SyncC
break
}
}
// update cluster condition firstly

if requeue {
controllerContext.Queue().AddAfter(clusterName, 1*time.Second)
}

if cluster == nil {
return utilerrors.NewAggregate(errs)
}

// update cluster condition
if len(errs) != 0 {
applyErrors := operatorhelpers.NewMultiLineAggregate(errs)
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
applyErrors := utilerrors.NewAggregate(errs)
meta.SetStatusCondition(&copyCluster.Status.Conditions, metav1.Condition{
Type: clusterv1.ManagedClusterConditionDeleting,
Status: metav1.ConditionFalse,
Reason: clusterv1.ConditionDeletingReasonResourceError,
Message: applyErrors.Error(),
})
}

if _, err = r.clusterPatcher.PatchStatus(ctx, cluster, cluster.Status, originalCluster.Status); err != nil {
if _, err = r.clusterPatcher.PatchStatus(ctx, cluster, copyCluster.Status, cluster.Status); err != nil {
errs = append(errs, err)
}

if requeue {
controllerContext.Queue().AddAfter(clusterName, 1*time.Second)
if len(errs) != 0 || requeue {
return utilerrors.NewAggregate(errs)
}
return utilerrors.NewAggregate(errs)

return r.clusterPatcher.RemoveFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer)
}
50 changes: 40 additions & 10 deletions pkg/registration/hub/gc/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import (

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kubeinformers "k8s.io/client-go/informers"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
fakemetadataclient "k8s.io/client-go/metadata/fake"
clienttesting "k8s.io/client-go/testing"

fakeclusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
Expand All @@ -27,27 +30,49 @@ import (

func TestGController(t *testing.T) {
cases := []struct {
name string
key string
cluster *clusterv1.ManagedCluster
expectedErr string
name string
key string
cluster *clusterv1.ManagedCluster
namesapce *corev1.Namespace
expectedErr string
validateActions func(t *testing.T, clusterActions []clienttesting.Action)
}{
{
name: "invalid key",
key: factory.DefaultQueueKey,
cluster: testinghelpers.NewDeletingManagedCluster(),
expectedErr: "",
validateActions: func(t *testing.T, clusterActions []clienttesting.Action) {
testingcommon.AssertNoActions(t, clusterActions)
},
},
{
name: "valid key",
name: "valid key no namespace",
key: testinghelpers.TestManagedClusterName,
cluster: testinghelpers.NewDeletingManagedCluster(),
expectedErr: "",
validateActions: func(t *testing.T, clusterActions []clienttesting.Action) {
testingcommon.AssertActions(t, clusterActions, "patch")
},
},
{
name: "valid key with cluster and namespace",
key: testinghelpers.TestManagedClusterName,
cluster: testinghelpers.NewDeletingManagedCluster(),
namesapce: newNamespace(testinghelpers.TestManagedClusterName),
expectedErr: "",
validateActions: func(t *testing.T, clusterActions []clienttesting.Action) {
testingcommon.AssertActions(t, clusterActions, "patch", "patch")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
kubeClient := fakeclient.NewSimpleClientset()
objs := []runtime.Object{}
if c.namesapce != nil {
objs = append(objs, c.namesapce)
}
kubeClient := fakeclient.NewSimpleClientset(objs...)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)

metadataClient := fakemetadataclient.NewSimpleMetadataClient(scheme.Scheme)
Expand All @@ -66,9 +91,9 @@ func TestGController(t *testing.T) {

_ = NewGCController(
kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
kubeInformerFactory.Rbac().V1().ClusterRoleBindings().Lister(),
kubeInformerFactory.Rbac().V1().RoleBindings().Lister(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
kubeInformerFactory.Core().V1().Namespaces().Lister(),
workInformerFactory.Work().V1().ManifestWorks().Lister(),
clusterClient,
kubeClient,
Expand All @@ -79,7 +104,12 @@ func TestGController(t *testing.T) {
"work.open-cluster-management.io/v1/manifestworks"},
true,
)

namespaceStore := kubeInformerFactory.Core().V1().Namespaces().Informer().GetStore()
if c.namesapce != nil {
if err := namespaceStore.Add(c.namesapce); err != nil {
t.Fatal(err)
}
}
clusterPatcher := patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters())
Expand All @@ -91,20 +121,20 @@ func TestGController(t *testing.T) {
newGCResourcesController(metadataClient, []schema.GroupVersionResource{addonGvr, workGvr},
events.NewInMemoryRecorder("")),
newGCClusterRbacController(kubeClient, clusterPatcher,
clusterInformerFactory.Cluster().V1().ManagedClusters(),
kubeInformerFactory.Rbac().V1().ClusterRoles().Lister(),
kubeInformerFactory.Rbac().V1().ClusterRoleBindings().Lister(),
kubeInformerFactory.Rbac().V1().RoleBindings().Lister(),
workInformerFactory.Work().V1().ManifestWorks().Lister(),
register.NewNoopApprover(),
events.NewInMemoryRecorder(""),
true),
},
namespaceLister: kubeInformerFactory.Core().V1().Namespaces().Lister(),
}

controllerContext := testingcommon.NewFakeSyncContext(t, c.key)
err := ctrl.sync(context.TODO(), controllerContext)
testingcommon.AssertError(t, err, c.expectedErr)
c.validateActions(t, clusterClient.Actions())
})
}
}
63 changes: 27 additions & 36 deletions pkg/registration/hub/gc/gc_cluster_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
operatorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/klog/v2"

informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"
Expand Down Expand Up @@ -46,9 +44,7 @@ var (

type gcClusterRbacController struct {
kubeClient kubernetes.Interface
clusterLister clusterv1listers.ManagedClusterLister
clusterRoleLister rbacv1listers.ClusterRoleLister
clusterRoleBingLister rbacv1listers.ClusterRoleBindingLister
roleBindingLister rbacv1listers.RoleBindingLister
manifestWorkLister worklister.ManifestWorkLister
clusterPatcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
Expand All @@ -61,9 +57,7 @@ type gcClusterRbacController struct {
func newGCClusterRbacController(
kubeClient kubernetes.Interface,
clusterPatcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus],
clusterInformer informerv1.ManagedClusterInformer,
clusterRoleLister rbacv1listers.ClusterRoleLister,
clusterRoleBindingLister rbacv1listers.ClusterRoleBindingLister,
roleBindingLister rbacv1listers.RoleBindingLister,
manifestWorkLister worklister.ManifestWorkLister,
approver register.Approver,
Expand All @@ -73,9 +67,7 @@ func newGCClusterRbacController(

return &gcClusterRbacController{
kubeClient: kubeClient,
clusterLister: clusterInformer.Lister(),
clusterRoleLister: clusterRoleLister,
clusterRoleBingLister: clusterRoleBindingLister,
roleBindingLister: roleBindingLister,
manifestWorkLister: manifestWorkLister,
clusterPatcher: clusterPatcher,
Expand All @@ -85,43 +77,42 @@ func newGCClusterRbacController(
}
}

func (r *gcClusterRbacController) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (gcReconcileOp, error) {
if cluster.DeletionTimestamp.IsZero() {
_, err := r.clusterPatcher.AddFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer)
return gcReconcileStop, err
func (r *gcClusterRbacController) reconcile(ctx context.Context,
cluster *clusterv1.ManagedCluster, clusterNamespace *corev1.Namespace) (gcReconcileOp, error) {
if cluster == nil && clusterNamespace == nil {
return gcReconcileStop, nil
}

if err := r.removeClusterRbac(ctx, cluster.Name, cluster.Spec.HubAcceptsClient); err != nil {
return gcReconcileContinue, err
}

if err := r.approver.Cleanup(ctx, cluster); err != nil {
return gcReconcileContinue, err
}
if cluster != nil {
if err := r.removeClusterRbac(ctx, cluster.Name, cluster.Spec.HubAcceptsClient); err != nil {
return gcReconcileContinue, err
}

works, err := r.manifestWorkLister.ManifestWorks(cluster.Name).List(labels.Everything())
if err != nil && !errors.IsNotFound(err) {
return gcReconcileStop, err
}
if len(works) != 0 {
klog.V(2).Infof("cluster %s is deleting, waiting %d works in the cluster namespace to be deleted.",
cluster.Name, len(works))
if err := r.approver.Cleanup(ctx, cluster); err != nil {
return gcReconcileContinue, err
}

// remove finalizer to delete the cluster for backwards compatible.
// if GC feature gate is disable, the finalizer is removed from the cluster after the related rbac is deleted.
// there is no need to wait other resources are cleaned up before remove the finalizer.
if !r.resourceCleanupFeatureGateEnable {
return gcReconcileStop, r.clusterPatcher.RemoveFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer)
if err := r.clusterPatcher.RemoveFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer); err != nil {
return gcReconcileStop, err
}
}
return gcReconcileRequeue, nil
}

if err = r.removeFinalizerFromWorkRoleBinding(ctx, cluster.Name, manifestWorkFinalizer); err != nil {
return gcReconcileStop, err
if clusterNamespace != nil {
works, err := r.manifestWorkLister.ManifestWorks(clusterNamespace.Name).List(labels.Everything())
if err != nil && !errors.IsNotFound(err) {
return gcReconcileStop, err
}
if len(works) != 0 {
return gcReconcileRequeue, nil
}
return gcReconcileStop, r.removeFinalizerFromWorkRoleBinding(ctx, clusterNamespace.Name, manifestWorkFinalizer)
}

r.eventRecorder.Eventf("ManagedClusterGC",
"managed cluster %s is deleting and the cluster rbac are deleted", cluster.Name)

return gcReconcileContinue, r.clusterPatcher.RemoveFinalizer(ctx, cluster, clusterv1.ManagedClusterFinalizer)
return gcReconcileStop, nil
}

func (r *gcClusterRbacController) removeClusterRbac(ctx context.Context, clusterName string, accepted bool) error {
Expand Down
Loading

0 comments on commit 88df560

Please sign in to comment.