From c95d930830912ceef3516b46994508c56214d236 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Mon, 14 Oct 2024 22:33:03 -0700 Subject: [PATCH] feat: support multiple controller with instance config (#2153) Signed-off-by: Derek Wang --- .../namespaced-controller-wo-crds.yaml | 2 ++ .../numaflow-controller-config.yaml | 2 ++ config/install.yaml | 2 ++ config/namespace-install.yaml | 2 ++ docs/operations/installation.md | 26 ++++++++++++++ .../numaflow-controller-config.yaml | 4 +++ go.mod | 4 +-- go.sum | 8 ++--- pkg/apis/numaflow/v1alpha1/const.go | 3 +- pkg/reconciler/config.go | 7 ++++ pkg/reconciler/config_test.go | 34 +++++++++++++++++++ pkg/reconciler/isbsvc/controller.go | 8 ++++- pkg/reconciler/monovertex/controller.go | 4 +++ pkg/reconciler/pipeline/controller.go | 12 +++++++ pkg/reconciler/vertex/controller.go | 8 +++++ 15 files changed, 118 insertions(+), 8 deletions(-) diff --git a/config/advanced-install/namespaced-controller-wo-crds.yaml b/config/advanced-install/namespaced-controller-wo-crds.yaml index 20ca3f2913..487bb74249 100644 --- a/config/advanced-install/namespaced-controller-wo-crds.yaml +++ b/config/advanced-install/namespaced-controller-wo-crds.yaml @@ -137,6 +137,8 @@ metadata: apiVersion: v1 data: controller-config.yaml: | + # "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers + instance: "" defaults: containerResources: | requests: diff --git a/config/base/controller-manager/numaflow-controller-config.yaml b/config/base/controller-manager/numaflow-controller-config.yaml index eaf49cd124..2164f116d8 100644 --- a/config/base/controller-manager/numaflow-controller-config.yaml +++ b/config/base/controller-manager/numaflow-controller-config.yaml @@ -4,6 +4,8 @@ metadata: name: numaflow-controller-config data: controller-config.yaml: |+ + # "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers + instance: "" defaults: containerResources: | requests: diff --git a/config/install.yaml b/config/install.yaml index a130e9c144..7acb3d54a0 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -28252,6 +28252,8 @@ metadata: apiVersion: v1 data: controller-config.yaml: | + # "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers + instance: "" defaults: containerResources: | requests: diff --git a/config/namespace-install.yaml b/config/namespace-install.yaml index 38053d2bfc..e97e13ba1d 100644 --- a/config/namespace-install.yaml +++ b/config/namespace-install.yaml @@ -28143,6 +28143,8 @@ metadata: apiVersion: v1 data: controller-config.yaml: | + # "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers + instance: "" defaults: containerResources: | requests: diff --git a/docs/operations/installation.md b/docs/operations/installation.md index d92d1cfa06..fcffdb6f55 100644 --- a/docs/operations/installation.md +++ b/docs/operations/installation.md @@ -138,3 +138,29 @@ data: ``` If HA is turned off, the controller deployment should not run with multiple replicas. + +## Multiple Controllers + +With in one cluster, or even in one namespace, you can run multiple Numaflow controllers by leveraging the `instance` configuration in the `numaflow-controller-config` ConfigMap. + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: numaflow-controller-config +data: + controller-config.yaml: | + # Within a cluster, setting "instance" can be used to run N Numaflow controllers. + # If configured, the controller will only watch the objects having an annotation with the key "numaflow.numaproj.io/instance" and the corresponding value. + # If not configured (or empty string), the controller will watch all objects. + instance: "" + defaults: + containerResources: | + requests: + memory: "128Mi" + cpu: "100m" + isbsvc: + ... +``` + +When `instance` is configured (e.g. `my-instance`), the controller will only watch the objects (`InterStepBufferService`, `Pipeline` and `MonoVertex`) having the annotation `numaflow.numaproj.io/instance: my-instance`. Correspondingly, if a `Pipeline` object has an annotation `numaflow.numaproj.io/instance: my-instance`, it requires the referenced `InterStepBufferService` also has the same annotation, or it will fail to orchestrate the pipeline. diff --git a/docs/operations/numaflow-controller-config.yaml b/docs/operations/numaflow-controller-config.yaml index e8e6a8af19..734e6981e4 100644 --- a/docs/operations/numaflow-controller-config.yaml +++ b/docs/operations/numaflow-controller-config.yaml @@ -4,6 +4,10 @@ metadata: name: numaflow-controller-config data: controller-config.yaml: | + # Within a cluster, setting "instance" can be used to run N Numaflow controllers. + # If configured, the controller will only watch the objects having an annotation with the key "numaflow.numaproj.io/instance" and the corresponding value. + # If not configured (or empty string), the controller will watch all objects. + instance: "" defaults: containerResources: | requests: diff --git a/go.mod b/go.mod index 02e20f7a35..a40001aeb8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/numaproj/numaflow go 1.23.1 require ( - github.com/IBM/sarama v1.43.2 + github.com/IBM/sarama v1.43.3 github.com/Masterminds/semver/v3 v3.3.0 github.com/Masterminds/sprig/v3 v3.2.3 github.com/ahmetb/gen-crd-api-reference-docs v0.3.0 @@ -85,7 +85,7 @@ require ( github.com/chenzhuoyu/iasm v0.9.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/eapache/go-resiliency v1.6.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect diff --git a/go.sum b/go.sum index 8df905cd8e..c5f73afc67 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw= -github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= @@ -123,8 +123,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= -github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index f65e2a5bd7..574752c304 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -26,7 +26,8 @@ const ( Project = "numaflow" // label/annotation keys. - KeyHash = "numaflow.numaproj.io/hash" // hash of the object + KeyInstance = "numaflow.numaproj.io/instance" // instance key of the object + KeyHash = "numaflow.numaproj.io/hash" // hash of the object KeyComponent = "app.kubernetes.io/component" KeyPartOf = "app.kubernetes.io/part-of" KeyManagedBy = "app.kubernetes.io/managed-by" diff --git a/pkg/reconciler/config.go b/pkg/reconciler/config.go index 16d368abf3..993c9bee4a 100644 --- a/pkg/reconciler/config.go +++ b/pkg/reconciler/config.go @@ -36,6 +36,7 @@ type GlobalConfig struct { } type config struct { + Instance string `json:"instance"` Defaults *DefaultConfig `json:"defaults"` ISBSvc *ISBSvcConfig `json:"isbsvc"` } @@ -83,6 +84,12 @@ type JetStreamVersion struct { StartCommand string `json:"startCommand"` } +func (g *GlobalConfig) GetInstance() string { + g.lock.RLock() + defer g.lock.RUnlock() + return g.conf.Instance +} + // Get controller scope default config func (g *GlobalConfig) GetDefaults() DefaultConfig { g.lock.RLock() diff --git a/pkg/reconciler/config_test.go b/pkg/reconciler/config_test.go index 5befdc0264..0896b57444 100644 --- a/pkg/reconciler/config_test.go +++ b/pkg/reconciler/config_test.go @@ -18,6 +18,7 @@ package reconciler import ( "reflect" + "sync" "testing" corev1 "k8s.io/api/core/v1" @@ -77,3 +78,36 @@ func TestGlobalConfig_GetDefaultContainerResources(t *testing.T) { } } } + +func TestGlobalConfig_GetInstance(t *testing.T) { + tests := []struct { + name string + instance string + }{ + { + name: "Empty instance", + instance: "", + }, + { + name: "Non-empty instance", + instance: "test-instance", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := &GlobalConfig{ + conf: &config{ + Instance: tt.instance, + }, + lock: &sync.RWMutex{}, + } + + got := g.GetInstance() + if got != tt.instance { + t.Errorf("GetInstance() = %v, want %v", got, tt.instance) + } + + }) + } +} diff --git a/pkg/reconciler/isbsvc/controller.go b/pkg/reconciler/isbsvc/controller.go index d94e14424d..987c9322f7 100644 --- a/pkg/reconciler/isbsvc/controller.go +++ b/pkg/reconciler/isbsvc/controller.go @@ -36,6 +36,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" "github.com/numaproj/numaflow/pkg/reconciler/isbsvc/installer" + "github.com/numaproj/numaflow/pkg/shared/logging" ) const ( @@ -67,6 +68,11 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct return ctrl.Result{}, err } log := r.logger.With("namespace", isbSvc.Namespace).With("isbsvc", isbSvc.Name) + if instance := isbSvc.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() { + log.Debugw("ISB Service not managed by this controller, skipping", zap.String("instance", instance)) + return ctrl.Result{}, nil + } + ctx = logging.WithLogger(ctx, log) isbSvcCopy := isbSvc.DeepCopy() reconcileErr := r.reconcile(ctx, isbSvcCopy) if reconcileErr != nil { @@ -87,7 +93,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct // reconcile does the real logic func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc *dfv1.InterStepBufferService) error { - log := r.logger.With("namespace", isbSvc.Namespace).With("isbsvc", isbSvc.Name) + log := logging.FromContext(ctx) if !isbSvc.DeletionTimestamp.IsZero() { log.Info("Deleting ISB Service") if controllerutil.ContainsFinalizer(isbSvc, finalizerName) { diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index 5e31ed6f28..a40f620c69 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -72,6 +72,10 @@ func (mr *monoVertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } log := mr.logger.With("namespace", monoVtx.Namespace).With("monoVertex", monoVtx.Name) + if instance := monoVtx.GetAnnotations()[dfv1.KeyInstance]; instance != mr.config.GetInstance() { + log.Debugw("MonoVertex not managed by this controller, skipping", zap.String("instance", instance)) + return ctrl.Result{}, nil + } ctx = logging.WithLogger(ctx, log) monoVtxCopy := monoVtx.DeepCopy() result, err := mr.reconcile(ctx, monoVtxCopy) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index 3ae7c49c00..0af2fb4788 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -81,6 +81,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } log := r.logger.With("namespace", pl.Namespace).With("pipeline", pl.Name) + if instance := pl.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() { + log.Debugw("Pipeline not managed by this controller, skipping", zap.String("instance", instance)) + return ctrl.Result{}, nil + } plCopy := pl.DeepCopy() ctx = logging.WithLogger(ctx, log) result, reconcileErr := r.reconcile(ctx, plCopy) @@ -236,6 +240,10 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err)) return err } + if isbSvc.GetAnnotations()[dfv1.KeyInstance] != pl.GetAnnotations()[dfv1.KeyInstance] { + log.Errorw("ISB Service is found but not managed by the same controller of this pipeline", zap.String("isbsvc", isbSvcName), zap.Error(err)) + return fmt.Errorf("isbsvc not managed by the same controller of this pipeline") + } if !isbSvc.Status.IsHealthy() { log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err)) return fmt.Errorf("isbsvc not healthy") @@ -652,6 +660,10 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { }, Spec: spec, } + // If corresponding pipline has instance annotation, we should copy it to the vertex + if x := pl.GetAnnotations()[dfv1.KeyInstance]; x != "" { + obj.Annotations[dfv1.KeyInstance] = x + } result[obj.Name] = obj } return result diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index 20945639ab..8d520609bf 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -72,6 +72,10 @@ func (r *vertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } log := r.logger.With("namespace", vertex.Namespace).With("vertex", vertex.Name).With("pipeline", vertex.Spec.PipelineName) ctx = logging.WithLogger(ctx, log) + if instance := vertex.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() { + log.Debugw("Vertex not managed by this controller, skipping", zap.String("instance", instance)) + return ctrl.Result{}, nil + } vertexCopy := vertex.DeepCopy() result, err := r.reconcile(ctx, vertexCopy) if err != nil { @@ -117,6 +121,10 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) ( vertex.Status.MarkPhaseFailed("FindISBSvcFailed", err.Error()) return ctrl.Result{}, err } + if isbSvc.GetAnnotations()[dfv1.KeyInstance] != vertex.GetAnnotations()[dfv1.KeyInstance] { + log.Errorw("ISB Service is found but not managed by the same controller of this vertex", zap.String("isbsvc", isbSvcName), zap.Error(err)) + return ctrl.Result{}, fmt.Errorf("isbsvc not managed by the same controller of this vertex") + } if !isbSvc.Status.IsHealthy() { log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err)) vertex.Status.MarkPhaseFailed("ISBSvcNotHealthy", "isbsvc not healthy")