Skip to content

Commit

Permalink
feat: support multiple controller with instance config (#2153)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Oct 15, 2024
1 parent 1ea4d2e commit c95d930
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 8 deletions.
2 changes: 2 additions & 0 deletions config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ metadata:
apiVersion: v1
data:
controller-config.yaml: |
# "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers
instance: ""
defaults:
containerResources: |
requests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ metadata:
name: numaflow-controller-config
data:
controller-config.yaml: |+
# "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers
instance: ""
defaults:
containerResources: |
requests:
Expand Down
2 changes: 2 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28252,6 +28252,8 @@ metadata:
apiVersion: v1
data:
controller-config.yaml: |
# "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers
instance: ""
defaults:
containerResources: |
requests:
Expand Down
2 changes: 2 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28143,6 +28143,8 @@ metadata:
apiVersion: v1
data:
controller-config.yaml: |
# "instance" configuration can be used to run multiple Numaflow controllers, check details at https://numaflow.numaproj.io/operations/installation/#multiple-controllers
instance: ""
defaults:
containerResources: |
requests:
Expand Down
26 changes: 26 additions & 0 deletions docs/operations/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,29 @@ data:
```
If HA is turned off, the controller deployment should not run with multiple replicas.
## Multiple Controllers
With in one cluster, or even in one namespace, you can run multiple Numaflow controllers by leveraging the `instance` configuration in the `numaflow-controller-config` ConfigMap.

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: numaflow-controller-config
data:
controller-config.yaml: |
# Within a cluster, setting "instance" can be used to run N Numaflow controllers.
# If configured, the controller will only watch the objects having an annotation with the key "numaflow.numaproj.io/instance" and the corresponding value.
# If not configured (or empty string), the controller will watch all objects.
instance: ""
defaults:
containerResources: |
requests:
memory: "128Mi"
cpu: "100m"
isbsvc:
...
```

When `instance` is configured (e.g. `my-instance`), the controller will only watch the objects (`InterStepBufferService`, `Pipeline` and `MonoVertex`) having the annotation `numaflow.numaproj.io/instance: my-instance`. Correspondingly, if a `Pipeline` object has an annotation `numaflow.numaproj.io/instance: my-instance`, it requires the referenced `InterStepBufferService` also has the same annotation, or it will fail to orchestrate the pipeline.
4 changes: 4 additions & 0 deletions docs/operations/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ metadata:
name: numaflow-controller-config
data:
controller-config.yaml: |
# Within a cluster, setting "instance" can be used to run N Numaflow controllers.
# If configured, the controller will only watch the objects having an annotation with the key "numaflow.numaproj.io/instance" and the corresponding value.
# If not configured (or empty string), the controller will watch all objects.
instance: ""
defaults:
containerResources: |
requests:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/numaproj/numaflow
go 1.23.1

require (
github.com/IBM/sarama v1.43.2
github.com/IBM/sarama v1.43.3
github.com/Masterminds/semver/v3 v3.3.0
github.com/Masterminds/sprig/v3 v3.2.3
github.com/ahmetb/gen-crd-api-reference-docs v0.3.0
Expand Down Expand Up @@ -85,7 +85,7 @@ require (
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw=
github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ=
github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
Expand Down Expand Up @@ -123,8 +123,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
Project = "numaflow"

// label/annotation keys.
KeyHash = "numaflow.numaproj.io/hash" // hash of the object
KeyInstance = "numaflow.numaproj.io/instance" // instance key of the object
KeyHash = "numaflow.numaproj.io/hash" // hash of the object
KeyComponent = "app.kubernetes.io/component"
KeyPartOf = "app.kubernetes.io/part-of"
KeyManagedBy = "app.kubernetes.io/managed-by"
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type GlobalConfig struct {
}

type config struct {
Instance string `json:"instance"`
Defaults *DefaultConfig `json:"defaults"`
ISBSvc *ISBSvcConfig `json:"isbsvc"`
}
Expand Down Expand Up @@ -83,6 +84,12 @@ type JetStreamVersion struct {
StartCommand string `json:"startCommand"`
}

func (g *GlobalConfig) GetInstance() string {
g.lock.RLock()
defer g.lock.RUnlock()
return g.conf.Instance
}

// Get controller scope default config
func (g *GlobalConfig) GetDefaults() DefaultConfig {
g.lock.RLock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/reconciler/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reconciler

import (
"reflect"
"sync"
"testing"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -77,3 +78,36 @@ func TestGlobalConfig_GetDefaultContainerResources(t *testing.T) {
}
}
}

func TestGlobalConfig_GetInstance(t *testing.T) {
tests := []struct {
name string
instance string
}{
{
name: "Empty instance",
instance: "",
},
{
name: "Non-empty instance",
instance: "test-instance",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &GlobalConfig{
conf: &config{
Instance: tt.instance,
},
lock: &sync.RWMutex{},
}

got := g.GetInstance()
if got != tt.instance {
t.Errorf("GetInstance() = %v, want %v", got, tt.instance)
}

})
}
}
8 changes: 7 additions & 1 deletion pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/reconciler"
"github.com/numaproj/numaflow/pkg/reconciler/isbsvc/installer"
"github.com/numaproj/numaflow/pkg/shared/logging"
)

const (
Expand Down Expand Up @@ -67,6 +68,11 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct
return ctrl.Result{}, err
}
log := r.logger.With("namespace", isbSvc.Namespace).With("isbsvc", isbSvc.Name)
if instance := isbSvc.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() {
log.Debugw("ISB Service not managed by this controller, skipping", zap.String("instance", instance))
return ctrl.Result{}, nil
}
ctx = logging.WithLogger(ctx, log)
isbSvcCopy := isbSvc.DeepCopy()
reconcileErr := r.reconcile(ctx, isbSvcCopy)
if reconcileErr != nil {
Expand All @@ -87,7 +93,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct

// reconcile does the real logic
func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc *dfv1.InterStepBufferService) error {
log := r.logger.With("namespace", isbSvc.Namespace).With("isbsvc", isbSvc.Name)
log := logging.FromContext(ctx)
if !isbSvc.DeletionTimestamp.IsZero() {
log.Info("Deleting ISB Service")
if controllerutil.ContainsFinalizer(isbSvc, finalizerName) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (mr *monoVertexReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
log := mr.logger.With("namespace", monoVtx.Namespace).With("monoVertex", monoVtx.Name)
if instance := monoVtx.GetAnnotations()[dfv1.KeyInstance]; instance != mr.config.GetInstance() {
log.Debugw("MonoVertex not managed by this controller, skipping", zap.String("instance", instance))
return ctrl.Result{}, nil
}
ctx = logging.WithLogger(ctx, log)
monoVtxCopy := monoVtx.DeepCopy()
result, err := mr.reconcile(ctx, monoVtxCopy)
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}
log := r.logger.With("namespace", pl.Namespace).With("pipeline", pl.Name)
if instance := pl.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() {
log.Debugw("Pipeline not managed by this controller, skipping", zap.String("instance", instance))
return ctrl.Result{}, nil
}
plCopy := pl.DeepCopy()
ctx = logging.WithLogger(ctx, log)
result, reconcileErr := r.reconcile(ctx, plCopy)
Expand Down Expand Up @@ -236,6 +240,10 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
return err
}
if isbSvc.GetAnnotations()[dfv1.KeyInstance] != pl.GetAnnotations()[dfv1.KeyInstance] {
log.Errorw("ISB Service is found but not managed by the same controller of this pipeline", zap.String("isbsvc", isbSvcName), zap.Error(err))
return fmt.Errorf("isbsvc not managed by the same controller of this pipeline")
}
if !isbSvc.Status.IsHealthy() {
log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err))
return fmt.Errorf("isbsvc not healthy")
Expand Down Expand Up @@ -652,6 +660,10 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
},
Spec: spec,
}
// If corresponding pipline has instance annotation, we should copy it to the vertex
if x := pl.GetAnnotations()[dfv1.KeyInstance]; x != "" {
obj.Annotations[dfv1.KeyInstance] = x
}
result[obj.Name] = obj
}
return result
Expand Down
8 changes: 8 additions & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (r *vertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
log := r.logger.With("namespace", vertex.Namespace).With("vertex", vertex.Name).With("pipeline", vertex.Spec.PipelineName)
ctx = logging.WithLogger(ctx, log)
if instance := vertex.GetAnnotations()[dfv1.KeyInstance]; instance != r.config.GetInstance() {
log.Debugw("Vertex not managed by this controller, skipping", zap.String("instance", instance))
return ctrl.Result{}, nil
}
vertexCopy := vertex.DeepCopy()
result, err := r.reconcile(ctx, vertexCopy)
if err != nil {
Expand Down Expand Up @@ -117,6 +121,10 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
vertex.Status.MarkPhaseFailed("FindISBSvcFailed", err.Error())
return ctrl.Result{}, err
}
if isbSvc.GetAnnotations()[dfv1.KeyInstance] != vertex.GetAnnotations()[dfv1.KeyInstance] {
log.Errorw("ISB Service is found but not managed by the same controller of this vertex", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, fmt.Errorf("isbsvc not managed by the same controller of this vertex")
}
if !isbSvc.Status.IsHealthy() {
log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err))
vertex.Status.MarkPhaseFailed("ISBSvcNotHealthy", "isbsvc not healthy")
Expand Down

0 comments on commit c95d930

Please sign in to comment.