From e342eab209bad4a9f74a3d650bdf59b2a5317a9b Mon Sep 17 00:00:00 2001 From: Daniel Valdivia <18384552+dvaldivia@users.noreply.github.com> Date: Mon, 8 Nov 2021 19:34:51 -0800 Subject: [PATCH] Support watching multiple Namespaces (#897) * Support watching multiple Namespaces Signed-off-by: Daniel Valdivia <18384552+dvaldivia@users.noreply.github.com> * Update pkg/controller/cluster/main-controller.go Co-authored-by: Harshavardhana Co-authored-by: Harshavardhana --- main.go | 44 +++++++++++++++-------- pkg/controller/cluster/main-controller.go | 20 ++++++++++- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 0f5815eccf7..5453f98085b 100644 --- a/main.go +++ b/main.go @@ -26,9 +26,12 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "time" + "github.com/minio/minio-go/v7/pkg/set" + "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/rest" @@ -114,7 +117,19 @@ func main() { klog.Errorf("Error building Prometheus clientset: %v", err.Error()) } - namespace, isNamespaced := os.LookupEnv("WATCHED_NAMESPACE") + // Get a comma separated list of namespaces to watch + namespacesENv, isNamespaced := os.LookupEnv("WATCHED_NAMESPACE") + var namespaces set.StringSet + if isNamespaced { + namespaces = set.NewStringSet() + rawNamespaces := strings.Split(namespacesENv, ",") + for _, nsStr := range rawNamespaces { + if nsStr != "" { + namespaces.Add(strings.TrimSpace(nsStr)) + } + } + klog.Infof("Watching only namespaces: %s", strings.Join(namespaces.ToSlice(), ",")) + } ctx := context.Background() var caContent []byte @@ -145,24 +160,21 @@ func main() { klog.Info("WARNING: Could not read ca.crt from the pod") } - var kubeInformerFactory kubeinformers.SharedInformerFactory - var minioInformerFactory informers.SharedInformerFactory - var promInformerFactory prominformers.SharedInformerFactory - if isNamespaced { - kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, kubeinformers.WithNamespace(namespace)) - minioInformerFactory = informers.NewSharedInformerFactoryWithOptions(controllerClient, time.Second*30, informers.WithNamespace(namespace)) - promInformerFactory = prominformers.NewSharedInformerFactoryWithOptions(promClient, time.Second*30, prominformers.WithNamespace(namespace)) - } else { - kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - minioInformerFactory = informers.NewSharedInformerFactory(controllerClient, time.Second*30) - promInformerFactory = prominformers.NewSharedInformerFactory(promClient, time.Second*30) - } + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) + minioInformerFactory := informers.NewSharedInformerFactory(controllerClient, time.Second*30) + promInformerFactory := prominformers.NewSharedInformerFactory(promClient, time.Second*30) + podName := os.Getenv("HOSTNAME") if podName == "" { podName = "operator-pod" } - mainController := cluster.NewController(podName, kubeClient, controllerClient, promClient, + mainController := cluster.NewController( + podName, + namespaces, + kubeClient, + controllerClient, + promClient, kubeInformerFactory.Apps().V1().StatefulSets(), kubeInformerFactory.Apps().V1().Deployments(), kubeInformerFactory.Core().V1().Pods(), @@ -170,7 +182,9 @@ func main() { minioInformerFactory.Minio().V2().Tenants(), kubeInformerFactory.Core().V1().Services(), promInformerFactory.Monitoring().V1().ServiceMonitors(), - hostsTemplate, version) + hostsTemplate, + version, + ) go kubeInformerFactory.Start(stopCh) go minioInformerFactory.Start(stopCh) diff --git a/pkg/controller/cluster/main-controller.go b/pkg/controller/cluster/main-controller.go index 116ea4852b3..5a944b01119 100644 --- a/pkg/controller/cluster/main-controller.go +++ b/pkg/controller/cluster/main-controller.go @@ -29,6 +29,9 @@ import ( "syscall" "time" + "github.com/minio/minio-go/v7/pkg/set" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -135,6 +138,8 @@ var ErrLogSearchNotReady = fmt.Errorf("Log Search is not ready") type Controller struct { // podName is the identifier of this instance podName string + // namespacesToWatch restricts the action of the opreator to a list of namespaces + namespacesToWatch set.StringSet // kubeClientSet is a standard kubernetes clientset kubeClientSet kubernetes.Interface // minioClientSet is a clientset for our own API group @@ -211,7 +216,7 @@ type Controller struct { } // NewController returns a new sample controller -func NewController(podName string, kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller { +func NewController(podName string, namespacesToWatch set.StringSet, kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller { // Create event broadcaster // Add minio-controller types to the default Kubernetes Scheme so Events can be @@ -225,6 +230,7 @@ func NewController(podName string, kubeClientSet kubernetes.Interface, minioClie controller := &Controller{ podName: podName, + namespacesToWatch: namespacesToWatch, kubeClientSet: kubeClientSet, minioClientSet: minioClientSet, promClient: promClient, @@ -1299,6 +1305,18 @@ func (c *Controller) enqueueTenant(obj interface{}) { runtime.HandleError(err) return } + if !c.namespacesToWatch.IsEmpty() { + meta, err := meta.Accessor(obj) + if err != nil { + runtime.HandleError(err) + return + } + if !c.namespacesToWatch.Contains(meta.GetNamespace()) { + klog.Infof("Ignoring tenant `%s` in namespace that is not watched by this controller.", key) + return + } + } + c.workqueue.AddRateLimited(key) }