diff --git a/cmd/mcs-controller-manager/main.go b/cmd/mcs-controller-manager/main.go index 48975bef..0fc2bad4 100644 --- a/cmd/mcs-controller-manager/main.go +++ b/cmd/mcs-controller-manager/main.go @@ -9,6 +9,7 @@ package main import ( "context" "flag" + "go.goms.io/fleet/pkg/utils/controller" "os" "os/signal" "sync" @@ -241,12 +242,13 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma hubClient := hubMgr.GetClient() klog.V(1).InfoS("Create multiclusterservice reconciler") - if err := (&multiclusterservice.Reconciler{ + mcs := &multiclusterservice.Reconciler{ Client: memberClient, Scheme: memberMgr.GetScheme(), FleetSystemNamespace: *fleetSystemNamespace, Recorder: memberMgr.GetEventRecorderFor(multiclusterservice.ControllerName), - }).SetupWithManager(memberMgr); err != nil { + } + if err := mcs.SetupWithManager(memberMgr); err != nil { klog.ErrorS(err, "Unable to create multiclusterservice reconciler") return err } @@ -269,6 +271,7 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma MemberClient: memberClient, HubClient: hubClient, AgentType: clusterv1beta1.MultiClusterServiceAgent, + Controllers: []controller.MemberController{mcs}, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler") return err diff --git a/cmd/member-net-controller-manager/main.go b/cmd/member-net-controller-manager/main.go index 9548a0ae..53309d1e 100644 --- a/cmd/member-net-controller-manager/main.go +++ b/cmd/member-net-controller-manager/main.go @@ -10,6 +10,7 @@ package main import ( "context" "flag" + "go.goms.io/fleet/pkg/utils/controller" "os" "os/signal" "sync" @@ -259,79 +260,94 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager. memberClient := memberMgr.GetClient() hubClient := hubMgr.GetClient() + var controllers []controller.MemberController klog.V(1).InfoS("Create endpointslice controller") - if err := (&endpointslice.Reconciler{ + endpointSliceController := &endpointslice.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, HubNamespace: mcHubNamespace, - }).SetupWithManager(ctx, memberMgr); err != nil { + } + if err := endpointSliceController.SetupWithManager(ctx, memberMgr); err != nil { klog.ErrorS(err, "Unable to create endpointslice controller") return err } + controllers = append(controllers, endpointSliceController) klog.V(1).InfoS("Create endpointsliceexport controller") - if err := (&endpointsliceexport.Reconciler{ + endpointSliceExportController := &endpointsliceexport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, - }).SetupWithManager(hubMgr); err != nil { + } + if err := endpointSliceExportController.SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create endpointsliceexport controller") return err } + controllers = append(controllers, endpointSliceExportController) klog.V(1).InfoS("Create endpointsliceimport controller") - if err := (&endpointsliceimport.Reconciler{ + endpointSliceImportController := &endpointsliceimport.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, FleetSystemNamespace: *fleetSystemNamespace, - }).SetupWithManager(ctx, memberMgr, hubMgr); err != nil { + } + if err := endpointSliceImportController.SetupWithManager(ctx, memberMgr, hubMgr); err != nil { klog.ErrorS(err, "Unable to create endpointsliceimport controller") return err } + controllers = append(controllers, endpointSliceImportController) klog.V(1).InfoS("Create internalserviceexport controller") - if err := (&internalserviceexport.Reconciler{ + internalServiceExportController := &internalserviceexport.Reconciler{ MemberClusterID: mcName, MemberClient: memberClient, HubClient: hubClient, Recorder: memberMgr.GetEventRecorderFor(internalserviceexport.ControllerName), - }).SetupWithManager(hubMgr); err != nil { + } + if err := internalServiceExportController.SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalserviceexport controller") return err } + controllers = append(controllers, internalServiceExportController) klog.V(1).InfoS("Create internalserviceimport controller") - if err := (&internalserviceimport.Reconciler{ + internalServiceImportController := &internalserviceimport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, - }).SetupWithManager(hubMgr); err != nil { + } + if err := internalServiceImportController.SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalserviceimport controller") return err } + controllers = append(controllers, internalServiceImportController) klog.V(1).InfoS("Create serviceexport reconciler") - if err := (&serviceexport.Reconciler{ + serviceExportController := &serviceexport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, MemberClusterID: mcName, HubNamespace: mcHubNamespace, Recorder: memberMgr.GetEventRecorderFor(serviceexport.ControllerName), - }).SetupWithManager(memberMgr); err != nil { + } + if err := serviceExportController.SetupWithManager(memberMgr); err != nil { klog.ErrorS(err, "Unable to create serviceexport reconciler") return err } + controllers = append(controllers, serviceExportController) klog.V(1).InfoS("Create serviceimport reconciler") - if err := (&serviceimport.Reconciler{ + serviceImportController := &serviceimport.Reconciler{ MemberClient: memberClient, HubClient: hubClient, MemberClusterID: mcName, HubNamespace: mcHubNamespace, - }).SetupWithManager(memberMgr); err != nil { + } + if err := serviceImportController.SetupWithManager(memberMgr); err != nil { klog.ErrorS(err, "Unable to create serviceimport reconciler") return err } + controllers = append(controllers, serviceImportController) if *isV1Alpha1APIEnabled { klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler") @@ -351,6 +367,7 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager. MemberClient: memberClient, HubClient: hubClient, AgentType: clusterv1beta1.ServiceExportImportAgent, + Controllers: controllers, }).SetupWithManager(hubMgr); err != nil { klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler") return err diff --git a/go.mod b/go.mod index 5cdc6664..69f9a889 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,8 @@ require ( require ( github.com/stretchr/testify v1.9.0 - go.goms.io/fleet v0.10.5 + go.goms.io/fleet v0.10.8 + golang.org/x/sync v0.7.0 ) require ( diff --git a/go.sum b/go.sum index 859000df..0b00707c 100644 --- a/go.sum +++ b/go.sum @@ -94,8 +94,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.goms.io/fleet v0.10.5 h1:Zc+pLk77zWv0hAqBbFZEMMd05MVw9P8jp8YHTy7WPdI= -go.goms.io/fleet v0.10.5/go.mod h1:FpVP3YsiewmyGH77Yx6sLngHbZKgepnmJDIibz2pjZo= +go.goms.io/fleet v0.10.7 h1:kVPcH+XPO894chIoHlMK0cNIi7xDAqy771yIAk4bQIQ= +go.goms.io/fleet v0.10.7/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw= +go.goms.io/fleet v0.10.8 h1:AAK4wr4uKB8ATMhC4cpCKYAq9lMr9XLYE5QE+vkBf5M= +go.goms.io/fleet v0.10.8/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -120,6 +122,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/controllers/member/endpointslice/controller.go b/pkg/controllers/member/endpointslice/controller.go index 5da51b64..ed771e74 100644 --- a/pkg/controllers/member/endpointslice/controller.go +++ b/pkg/controllers/member/endpointslice/controller.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "time" discoveryv1 "k8s.io/api/discovery/v1" @@ -53,6 +54,8 @@ type Reconciler struct { HubClient client.Client // The namespace reserved for the current member cluster in the hub cluster. HubNamespace string + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch;delete @@ -110,6 +113,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } + if !r.joined.Load() { + klog.V(2).InfoS("EndpointSlice controller is not started yet, requeue the request", "endpointSlice", endpointSliceRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // Retrieve the unique name assigned; if none has been assigned, or the one assigned is not valid, possibly due // to user tampering with the annotation, assign a new unique name. fleetUniqueName, ok := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName] @@ -439,3 +447,24 @@ func (r *Reconciler) annotateLastSeenGenerationAndTimestamp(ctx context.Context, endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat) return r.MemberClient.Update(ctx, endpointSlice) } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the endpointSlice controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only handle the delete events. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the endpointSlice controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/endpointslice/suite_test.go b/pkg/controllers/member/endpointslice/suite_test.go index c86688a3..b598f0a5 100644 --- a/pkg/controllers/member/endpointslice/suite_test.go +++ b/pkg/controllers/member/endpointslice/suite_test.go @@ -31,6 +31,7 @@ var ( hubClient client.Client ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) // setUpResources help set up resources in the test environment. @@ -99,13 +100,15 @@ var _ = BeforeSuite(func() { ctrlMgr, err := ctrl.NewManager(memberCfg, ctrl.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClusterID: memberClusterID, MemberClient: memberClient, HubClient: hubClient, HubNamespace: hubNSForMember, - }).SetupWithManager(ctx, ctrlMgr) + } + err = reconciler.SetupWithManager(ctx, ctrlMgr) Expect(err).NotTo(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) go func() { defer GinkgoRecover() @@ -116,6 +119,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/member/endpointsliceexport/controller.go b/pkg/controllers/member/endpointsliceexport/controller.go index af2c1990..8e46d810 100644 --- a/pkg/controllers/member/endpointsliceexport/controller.go +++ b/pkg/controllers/member/endpointsliceexport/controller.go @@ -125,3 +125,17 @@ func isEndpointSliceExportLinkedWithEndpointSlice(endpointSliceExport *fleetnetv } return true } + +// Join does nothing. +// There is no need to start or stop the controller as this controller is designed to clean up any invalid +// EndpointSliceExport in the hub cluster. +func (r *Reconciler) Join(_ context.Context) error { + // do nothing + return nil +} + +// Leave does nothing. +func (r *Reconciler) Leave(_ context.Context) error { + // do nothing + return nil +} diff --git a/pkg/controllers/member/endpointsliceimport/controller.go b/pkg/controllers/member/endpointsliceimport/controller.go index 2f86e69b..2bfd1563 100644 --- a/pkg/controllers/member/endpointsliceimport/controller.go +++ b/pkg/controllers/member/endpointsliceimport/controller.go @@ -10,6 +10,7 @@ package endpointsliceimport import ( "context" "fmt" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -82,6 +83,8 @@ type Reconciler struct { HubClient client.Client // The namespace reserved for fleet resources in the member cluster. FleetSystemNamespace string + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;update;patch @@ -130,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } + if !r.joined.Load() { + klog.V(2).InfoS("EndpointSliceImport controller is not started yet, requeue the request", "endpointSliceImport", endpointSliceImportRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // Import the EndpointSlice, or update an imported EndpointSlice. // Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with. @@ -428,3 +436,24 @@ func (r *Reconciler) observeMetrics(ctx context.Context, endpointSliceImport *fl "isFirstImport", isFirstImport) return nil } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the endpointSliceImport controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only handle the delete events. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the endpointSliceImport controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/endpointsliceimport/suite_test.go b/pkg/controllers/member/endpointsliceimport/suite_test.go index 238fe80f..cf20be66 100644 --- a/pkg/controllers/member/endpointsliceimport/suite_test.go +++ b/pkg/controllers/member/endpointsliceimport/suite_test.go @@ -32,6 +32,7 @@ var ( hubClient client.Client ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) // setUpResources help set up resources in the test environment. @@ -116,12 +117,14 @@ var _ = BeforeSuite(func() { hubClient = hubCtrlMgr.GetClient() Expect(hubClient).NotTo(BeNil()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClient: memberClient, HubClient: hubClient, FleetSystemNamespace: fleetSystemNS, - }).SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr) + } + err = reconciler.SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr) Expect(err).NotTo(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) go func() { defer GinkgoRecover() @@ -141,6 +144,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/member/internalmembercluster/v1alpha1/suite_test.go b/pkg/controllers/member/internalmembercluster/v1alpha1/suite_test.go index 5dba3352..057955fb 100644 --- a/pkg/controllers/member/internalmembercluster/v1alpha1/suite_test.go +++ b/pkg/controllers/member/internalmembercluster/v1alpha1/suite_test.go @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() { filepath.Join("../../../../../", "config", "crd", "bases"), // need to make sure the version matches the one in the go.mod // workaround mentioned in https://github.com/kubernetes-sigs/controller-runtime/issues/1191 - filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.5", "config", "crd", "bases"), + filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.8", "config", "crd", "bases"), }, ErrorIfCRDPathMissing: true, } diff --git a/pkg/controllers/member/internalmembercluster/v1beta1/controller_v1beta1.go b/pkg/controllers/member/internalmembercluster/v1beta1/controller_v1beta1.go index caaef289..82041879 100644 --- a/pkg/controllers/member/internalmembercluster/v1beta1/controller_v1beta1.go +++ b/pkg/controllers/member/internalmembercluster/v1beta1/controller_v1beta1.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + "golang.org/x/sync/errgroup" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/utils/controller" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/apiretry" @@ -38,6 +40,8 @@ type Reconciler struct { MemberClient client.Client HubClient client.Client AgentType clusterv1beta1.AgentType + + Controllers []controller.MemberController } //+kubebuilder:rbac:groups=fleet.azure.com,resources=internalmemberclusters,verbs=get;list;watch @@ -71,6 +75,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu case clusterv1beta1.ClusterStateLeave: // The member cluster is leaving the fleet. klog.V(2).InfoS("member cluster has left the fleet; performing cleanup", "internalMemberCluster", imcKRef) + if err := r.stopControllers(ctx); err != nil { + klog.ErrorS(err, "Failed to stop member controllers", "internalMemberCluster", imcKRef) + return ctrl.Result{}, err + } // Clean up fleet networking related resources. if r.AgentType == clusterv1beta1.MultiClusterServiceAgent { @@ -87,6 +95,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Update the agent status. return ctrl.Result{}, r.updateAgentStatus(ctx, &imc) case clusterv1beta1.ClusterStateJoin: + if err := r.startControllers(ctx); err != nil { + klog.ErrorS(err, "Failed to start member controllers", "internalMemberCluster", imcKRef) + return ctrl.Result{}, err + } + // The member cluster still has an active membership in the fleet; update the agent status. if err := r.updateAgentStatus(ctx, &imc); err != nil { return ctrl.Result{}, err @@ -223,6 +236,28 @@ func (r *Reconciler) cleanupServiceExportRelatedResources(ctx context.Context) e return nil } +func (r *Reconciler) startControllers(ctx context.Context) error { + errs, cctx := errgroup.WithContext(ctx) + for i := range r.Controllers { + c := r.Controllers[i] + errs.Go(func() error { + return c.Join(cctx) + }) + } + return errs.Wait() +} + +func (r *Reconciler) stopControllers(ctx context.Context) error { + errs, cctx := errgroup.WithContext(ctx) + for i := range r.Controllers { + c := r.Controllers[i] + errs.Go(func() error { + return c.Leave(cctx) + }) + } + return errs.Wait() +} + // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/pkg/controllers/member/internalmembercluster/v1beta1/suite_test.go b/pkg/controllers/member/internalmembercluster/v1beta1/suite_test.go index 0c32f679..c9fb96a2 100644 --- a/pkg/controllers/member/internalmembercluster/v1beta1/suite_test.go +++ b/pkg/controllers/member/internalmembercluster/v1beta1/suite_test.go @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() { CRDDirectoryPaths: []string{ filepath.Join("..", "..", "..", "..", "..", "config", "crd", "bases"), // The package name must match with the version of the fleet package in use. - filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.5", "config", "crd", "bases"), + filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.8", "config", "crd", "bases"), }, ErrorIfCRDPathMissing: true, diff --git a/pkg/controllers/member/internalserviceexport/controller.go b/pkg/controllers/member/internalserviceexport/controller.go index 9a278418..d5b7fb08 100644 --- a/pkg/controllers/member/internalserviceexport/controller.go +++ b/pkg/controllers/member/internalserviceexport/controller.go @@ -10,6 +10,7 @@ package internalserviceexport import ( "context" "reflect" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -71,6 +72,9 @@ type Reconciler struct { MemberClient client.Client HubClient client.Client Recorder record.EventRecorder + + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports,verbs=get;list;watch;create;update;patch;delete @@ -129,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, err } + if !r.joined.Load() { + klog.V(2).InfoS("InternalServiceExport controller is not started yet, requeue the request", "internalServiceExport", internalSvcExportRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // Report back conflict resolution result. klog.V(4).InfoS("Report back conflict resolution result", "internalServiceExport", internalSvcExportRef) reported, err := r.reportBackConflictCondition(ctx, &svcExport, &internalSvcExport) @@ -249,3 +258,24 @@ func (r *Reconciler) observeMetrics(ctx context.Context, "originClusterID", r.MemberClusterID) return nil } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the internalServiceExport controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only delete any orphan resources. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the internalServiceExport controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/internalserviceexport/suite_test.go b/pkg/controllers/member/internalserviceexport/suite_test.go index 5e787a9a..36a98936 100644 --- a/pkg/controllers/member/internalserviceexport/suite_test.go +++ b/pkg/controllers/member/internalserviceexport/suite_test.go @@ -33,6 +33,7 @@ var ( hubClient client.Client ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) // setUpResources help set up resources in the test environment. @@ -110,12 +111,14 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClient: memberClient, HubClient: hubClient, Recorder: ctrlMgr.GetEventRecorderFor(ControllerName), // in main func, we use member controller manager instead - }).SetupWithManager(ctrlMgr) + } + err = reconciler.SetupWithManager(ctrlMgr) Expect(err).NotTo(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) go func() { defer GinkgoRecover() @@ -126,6 +129,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/member/internalserviceimport/controller.go b/pkg/controllers/member/internalserviceimport/controller.go index 4246dd81..78b357e5 100644 --- a/pkg/controllers/member/internalserviceimport/controller.go +++ b/pkg/controllers/member/internalserviceimport/controller.go @@ -9,6 +9,7 @@ package internalserviceimport import ( "context" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/equality" @@ -25,6 +26,8 @@ import ( type Reconciler struct { MemberClient client.Client HubClient client.Client + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceimports,verbs=get;list;watch;delete @@ -78,6 +81,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, err } + if !r.joined.Load() { + klog.V(2).InfoS("InternalServiceImport controller is not started yet, requeue the request", "internalServiceImport", internalSvcImportKRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // no status change if equality.Semantic.DeepEqual(internalSvcImport.Status, serviceImport.Status) { return ctrl.Result{}, nil @@ -102,3 +110,24 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { For(&fleetnetv1alpha1.InternalServiceImport{}). Complete(r) } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the internalServiceImport controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only delete any orphan resources. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the internalServiceImport controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/internalserviceimport/suite_test.go b/pkg/controllers/member/internalserviceimport/suite_test.go index 6440b86a..5ab4b760 100644 --- a/pkg/controllers/member/internalserviceimport/suite_test.go +++ b/pkg/controllers/member/internalserviceimport/suite_test.go @@ -40,6 +40,7 @@ var ( hubTestEnv *envtest.Environment ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) const ( @@ -120,11 +121,13 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClient: memberClient, HubClient: hubClient, - }).SetupWithManager(mgr) + } + err = reconciler.SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) ctx, cancel = context.WithCancel(context.TODO()) go func() { @@ -136,6 +139,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/member/serviceexport/controller.go b/pkg/controllers/member/serviceexport/controller.go index adbf83bf..57bffd0f 100644 --- a/pkg/controllers/member/serviceexport/controller.go +++ b/pkg/controllers/member/serviceexport/controller.go @@ -11,6 +11,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "time" corev1 "k8s.io/api/core/v1" @@ -52,6 +53,8 @@ type Reconciler struct { // The namespace reserved for the current member cluster in the hub cluster. HubNamespace string Recorder record.EventRecorder + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete @@ -103,6 +106,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } + if !r.joined.Load() { + klog.V(2).InfoS("ServiceExport controller is not started yet, requeue the request", "service", svcRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // Check if the Service to export exists. svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -412,3 +420,24 @@ func (r *Reconciler) annotateLastSeenResourceVersionAndTimestamp(ctx context.Con svcExport.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat) return r.MemberClient.Update(ctx, svcExport) } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the serviceExport controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only handle the delete events. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the serviceExport controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/serviceexport/suite_test.go b/pkg/controllers/member/serviceexport/suite_test.go index e01e24a5..12da2dbe 100644 --- a/pkg/controllers/member/serviceexport/suite_test.go +++ b/pkg/controllers/member/serviceexport/suite_test.go @@ -32,6 +32,7 @@ var ( hubClient client.Client ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) // setUpResources help set up resources in the test environment. @@ -105,14 +106,16 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClusterID: memberClusterID, MemberClient: memberClient, HubClient: hubClient, HubNamespace: hubNSForMember, Recorder: ctrlMgr.GetEventRecorderFor(ControllerName), - }).SetupWithManager(ctrlMgr) + } + err = reconciler.SetupWithManager(ctrlMgr) Expect(err).NotTo(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) go func() { defer GinkgoRecover() @@ -123,6 +126,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/member/serviceimport/controller.go b/pkg/controllers/member/serviceimport/controller.go index 1680991e..e0ef63a4 100644 --- a/pkg/controllers/member/serviceimport/controller.go +++ b/pkg/controllers/member/serviceimport/controller.go @@ -10,6 +10,7 @@ package serviceimport import ( "context" "fmt" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -36,6 +37,9 @@ type Reconciler struct { HubClient client.Client MemberClient client.Client + + // whether to start exporting an EndpointSlice + joined atomic.Bool } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports,verbs=get;list;watch;update;patch @@ -95,6 +99,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } + if !r.joined.Load() { + klog.V(2).InfoS("ServiceImport controller is not started yet, requeue the request", "serviceImport", serviceImportRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // Add finalizer when it's in service import when not being deleted if !controllerutil.ContainsFinalizer(serviceImport, ServiceImportFinalizer) { controllerutil.AddFinalizer(serviceImport, ServiceImportFinalizer) @@ -134,3 +143,24 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { func formatInternalServiceImportName(serviceImport *fleetnetv1alpha1.ServiceImport) string { return fmt.Sprintf("%s-%s", serviceImport.Namespace, serviceImport.Name) } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the serviceImport controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only handle the delete events. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the serviceImport controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/member/serviceimport/suite_test.go b/pkg/controllers/member/serviceimport/suite_test.go index 47c10947..24ca6b7c 100644 --- a/pkg/controllers/member/serviceimport/suite_test.go +++ b/pkg/controllers/member/serviceimport/suite_test.go @@ -38,6 +38,7 @@ var ( hubClient client.Client ctx context.Context cancel context.CancelFunc + reconciler *Reconciler ) const ( @@ -100,13 +101,15 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ MemberClusterID: MemberClusterID, HubNamespace: HubNamespace, MemberClient: memberClient, HubClient: hubClient, - }).SetupWithManager(mgr) + } + err = reconciler.SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) setupResources() @@ -131,6 +134,7 @@ func setupResources() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) cancel() By("tearing down the test environment") diff --git a/pkg/controllers/multiclusterservice/controller.go b/pkg/controllers/multiclusterservice/controller.go index ecd9d1c1..7b081c09 100644 --- a/pkg/controllers/multiclusterservice/controller.go +++ b/pkg/controllers/multiclusterservice/controller.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "time" corev1 "k8s.io/api/core/v1" @@ -63,6 +64,7 @@ type Reconciler struct { Scheme *runtime.Scheme FleetSystemNamespace string // reserved fleet namespace Recorder record.EventRecorder + joined atomic.Bool } //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete @@ -98,6 +100,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return r.handleDelete(ctx, &mcs) } + if !r.joined.Load() { + klog.V(2).InfoS("MultiClusterService controller is not started yet, requeue the request", "multiClusterService", mcsKRef) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + // register finalizer if !controllerutil.ContainsFinalizer(&mcs, multiClusterServiceFinalizer) { controllerutil.AddFinalizer(&mcs, multiClusterServiceFinalizer) @@ -447,3 +454,24 @@ func (r *Reconciler) serviceEventHandler() handler.MapFunc { } } } + +// Join marks the joined status as true. +func (r *Reconciler) Join(_ context.Context) error { + if r.joined.Load() { + return nil + } + klog.InfoS("Mark the multiClusterService controller joined") + r.joined.Store(true) + return nil +} + +// Leave marks the joined status as false. +// When the controller is in the leave state, it will only handle the delete events. +func (r *Reconciler) Leave(_ context.Context) error { + if !r.joined.Load() { + return nil + } + klog.InfoS("Mark the multiClusterService controller left") + r.joined.Store(false) + return nil +} diff --git a/pkg/controllers/multiclusterservice/suite_test.go b/pkg/controllers/multiclusterservice/suite_test.go index da770c3c..345c2a6f 100644 --- a/pkg/controllers/multiclusterservice/suite_test.go +++ b/pkg/controllers/multiclusterservice/suite_test.go @@ -31,12 +31,13 @@ import ( ) var ( - cfg *rest.Config - mgr manager.Manager - k8sClient client.Client - testEnv *envtest.Environment - ctx context.Context - cancel context.CancelFunc + cfg *rest.Config + mgr manager.Manager + k8sClient client.Client + testEnv *envtest.Environment + ctx context.Context + cancel context.CancelFunc + reconciler *Reconciler ) func TestAPIs(t *testing.T) { @@ -82,13 +83,15 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = (&Reconciler{ + reconciler = &Reconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), FleetSystemNamespace: "fleet-system", Recorder: mgr.GetEventRecorderFor(ControllerName), - }).SetupWithManager(mgr) + } + err = reconciler.SetupWithManager(mgr) Expect(err).ToNot(HaveOccurred()) + Expect(reconciler.Join(ctx)).Should(Succeed()) ctx, cancel = context.WithCancel(context.TODO()) @@ -117,6 +120,7 @@ var _ = BeforeSuite(func() { var _ = AfterSuite(func() { defer klog.Flush() + Expect(reconciler.Leave(ctx)).Should(Succeed()) By("delete multiClusterService namespace") ns := corev1.Namespace{ diff --git a/test/scripts/bootstrap.sh b/test/scripts/bootstrap.sh index 7a303905..4398050d 100644 --- a/test/scripts/bootstrap.sh +++ b/test/scripts/bootstrap.sh @@ -168,7 +168,7 @@ fi kubectl config use-context $HUB_CLUSTER-admin # need to make sure the version matches the one in the go.mod # workaround mentioned in https://github.com/kubernetes-sigs/controller-runtime/issues/1191 -kubectl apply -f `go env GOPATH`/pkg/mod/go.goms.io/fleet@v0.10.5/config/crd/bases/fleet.azure.com_internalmemberclusters.yaml +kubectl apply -f `go env GOPATH`/pkg/mod/go.goms.io/fleet@v0.10.8/config/crd/bases/fleet.azure.com_internalmemberclusters.yaml kubectl apply -f config/crd/* helm install hub-net-controller-manager \ ./charts/hub-net-controller-manager/ \