Skip to content

Commit

Permalink
Support watching multiple Namespaces (#897)
Browse files Browse the repository at this point in the history
* Support watching multiple Namespaces

Signed-off-by: Daniel Valdivia <[email protected]>

* Update pkg/controller/cluster/main-controller.go

Co-authored-by: Harshavardhana <[email protected]>

Co-authored-by: Harshavardhana <[email protected]>
  • Loading branch information
dvaldivia and harshavardhana authored Nov 9, 2021
1 parent b3fe304 commit e342eab
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
44 changes: 29 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/minio/minio-go/v7/pkg/set"

"k8s.io/client-go/tools/clientcmd"

"k8s.io/client-go/rest"
Expand Down Expand Up @@ -114,7 +117,19 @@ func main() {
klog.Errorf("Error building Prometheus clientset: %v", err.Error())
}

namespace, isNamespaced := os.LookupEnv("WATCHED_NAMESPACE")
// Get a comma separated list of namespaces to watch
namespacesENv, isNamespaced := os.LookupEnv("WATCHED_NAMESPACE")
var namespaces set.StringSet
if isNamespaced {
namespaces = set.NewStringSet()
rawNamespaces := strings.Split(namespacesENv, ",")
for _, nsStr := range rawNamespaces {
if nsStr != "" {
namespaces.Add(strings.TrimSpace(nsStr))
}
}
klog.Infof("Watching only namespaces: %s", strings.Join(namespaces.ToSlice(), ","))
}

ctx := context.Background()
var caContent []byte
Expand Down Expand Up @@ -145,32 +160,31 @@ func main() {
klog.Info("WARNING: Could not read ca.crt from the pod")
}

var kubeInformerFactory kubeinformers.SharedInformerFactory
var minioInformerFactory informers.SharedInformerFactory
var promInformerFactory prominformers.SharedInformerFactory
if isNamespaced {
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, kubeinformers.WithNamespace(namespace))
minioInformerFactory = informers.NewSharedInformerFactoryWithOptions(controllerClient, time.Second*30, informers.WithNamespace(namespace))
promInformerFactory = prominformers.NewSharedInformerFactoryWithOptions(promClient, time.Second*30, prominformers.WithNamespace(namespace))
} else {
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
minioInformerFactory = informers.NewSharedInformerFactory(controllerClient, time.Second*30)
promInformerFactory = prominformers.NewSharedInformerFactory(promClient, time.Second*30)
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
minioInformerFactory := informers.NewSharedInformerFactory(controllerClient, time.Second*30)
promInformerFactory := prominformers.NewSharedInformerFactory(promClient, time.Second*30)

podName := os.Getenv("HOSTNAME")
if podName == "" {
podName = "operator-pod"
}

mainController := cluster.NewController(podName, kubeClient, controllerClient, promClient,
mainController := cluster.NewController(
podName,
namespaces,
kubeClient,
controllerClient,
promClient,
kubeInformerFactory.Apps().V1().StatefulSets(),
kubeInformerFactory.Apps().V1().Deployments(),
kubeInformerFactory.Core().V1().Pods(),
kubeInformerFactory.Batch().V1().Jobs(),
minioInformerFactory.Minio().V2().Tenants(),
kubeInformerFactory.Core().V1().Services(),
promInformerFactory.Monitoring().V1().ServiceMonitors(),
hostsTemplate, version)
hostsTemplate,
version,
)

go kubeInformerFactory.Start(stopCh)
go minioInformerFactory.Start(stopCh)
Expand Down
20 changes: 19 additions & 1 deletion pkg/controller/cluster/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"syscall"
"time"

"github.com/minio/minio-go/v7/pkg/set"
"k8s.io/apimachinery/pkg/api/meta"

"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

Expand Down Expand Up @@ -135,6 +138,8 @@ var ErrLogSearchNotReady = fmt.Errorf("Log Search is not ready")
type Controller struct {
// podName is the identifier of this instance
podName string
// namespacesToWatch restricts the action of the opreator to a list of namespaces
namespacesToWatch set.StringSet
// kubeClientSet is a standard kubernetes clientset
kubeClientSet kubernetes.Interface
// minioClientSet is a clientset for our own API group
Expand Down Expand Up @@ -211,7 +216,7 @@ type Controller struct {
}

// NewController returns a new sample controller
func NewController(podName string, kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller {
func NewController(podName string, namespacesToWatch set.StringSet, kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller {

// Create event broadcaster
// Add minio-controller types to the default Kubernetes Scheme so Events can be
Expand All @@ -225,6 +230,7 @@ func NewController(podName string, kubeClientSet kubernetes.Interface, minioClie

controller := &Controller{
podName: podName,
namespacesToWatch: namespacesToWatch,
kubeClientSet: kubeClientSet,
minioClientSet: minioClientSet,
promClient: promClient,
Expand Down Expand Up @@ -1299,6 +1305,18 @@ func (c *Controller) enqueueTenant(obj interface{}) {
runtime.HandleError(err)
return
}
if !c.namespacesToWatch.IsEmpty() {
meta, err := meta.Accessor(obj)
if err != nil {
runtime.HandleError(err)
return
}
if !c.namespacesToWatch.Contains(meta.GetNamespace()) {
klog.Infof("Ignoring tenant `%s` in namespace that is not watched by this controller.", key)
return
}
}

c.workqueue.AddRateLimited(key)
}

Expand Down

0 comments on commit e342eab

Please sign in to comment.