diff --git a/config/channel/consolidated/500-dispatcher.yaml b/config/channel/consolidated/500-dispatcher.yaml deleted file mode 120000 index 4baa3fdedf..0000000000 --- a/config/channel/consolidated/500-dispatcher.yaml +++ /dev/null @@ -1 +0,0 @@ -deployments/dispatcher.yaml \ No newline at end of file diff --git a/config/channel/consolidated/deployments/controller.yaml b/config/channel/consolidated/deployments/controller.yaml index 823d5cae4d..acd482d544 100644 --- a/config/channel/consolidated/deployments/controller.yaml +++ b/config/channel/consolidated/deployments/controller.yaml @@ -18,6 +18,8 @@ metadata: name: kafka-ch-controller namespace: knative-eventing labels: + messaging.knative.dev/channel: kafka-channel + messaging.knative.dev/role: controller kafka.eventing.knative.dev/release: devel spec: replicas: 1 diff --git a/config/channel/consolidated/deployments/dispatcher.yaml b/config/channel/consolidated/deployments/dispatcher.yaml deleted file mode 100644 index 90c462cfc8..0000000000 --- a/config/channel/consolidated/deployments/dispatcher.yaml +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright 2020 The Knative Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: apps/v1 -kind: Deployment -metadata: - name: kafka-ch-dispatcher - namespace: knative-eventing - labels: - kafka.eventing.knative.dev/release: devel -spec: - # this deployment is going to be scaled up by the - # controller when the very first KafkaChannel is created - replicas: 0 - selector: - matchLabels: - messaging.knative.dev/channel: kafka-channel - messaging.knative.dev/role: dispatcher - template: - metadata: - annotations: - # This annotation is used by the controller to track updates - # to config-kafka and apply them in the dispatcher deployment - kafka.eventing.knative.dev/configmap-hash: '' - labels: - # Do not change. Used by the controller for probing. - messaging.knative.dev/channel: kafka-channel - # Do not change. Used by the controller for probing. - messaging.knative.dev/role: dispatcher - kafka.eventing.knative.dev/release: devel - spec: - containers: - - name: dispatcher - image: ko://knative.dev/eventing-kafka/cmd/channel/consolidated/dispatcher - env: - - name: SYSTEM_NAMESPACE - value: '' - valueFrom: - fieldRef: - apiVersion: v1 - fieldPath: metadata.namespace - - name: POD_NAME - valueFrom: - fieldRef: - fieldPath: metadata.name - - name: METRICS_DOMAIN - value: knative.dev/eventing - - name: CONFIG_LOGGING_NAME - value: config-logging - - name: CONFIG_LEADERELECTION_NAME - value: config-leader-election - - name: CONTAINER_NAME - value: dispatcher - ports: - - containerPort: 9090 - name: metrics - protocol: TCP - - containerPort: 8081 - name: sub-status - protocol: TCP - volumeMounts: - - name: config-kafka - mountPath: /etc/config-kafka - serviceAccountName: kafka-ch-dispatcher - volumes: - - name: config-kafka - configMap: - name: config-kafka - ---- - -apiVersion: v1 -kind: Service -metadata: - labels: - messaging.knative.dev/channel: kafka-channel - messaging.knative.dev/role: dispatcher - name: kafka-ch-dispatcher - namespace: knative-eventing -spec: - ports: - - name: http-dispatcher - port: 80 - protocol: TCP - targetPort: 8080 - - name: http-sub-status - port: 8081 - protocol: TCP - targetPort: 8081 - selector: - messaging.knative.dev/channel: kafka-channel - messaging.knative.dev/role: dispatcher diff --git a/pkg/channel/consolidated/reconciler/controller/controller.go b/pkg/channel/consolidated/reconciler/controller/controller.go index 2aa823051a..f96afaf1fe 100644 --- a/pkg/channel/consolidated/reconciler/controller/controller.go +++ b/pkg/channel/consolidated/reconciler/controller/controller.go @@ -18,24 +18,22 @@ package controller import ( "context" + "fmt" + "time" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/consolidated/status" - kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1" - kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client" - "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" - kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" - commonconfig "knative.dev/eventing-kafka/pkg/common/config" + "k8s.io/utils/pointer" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingClient "knative.dev/eventing/pkg/client/injection/client" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" "knative.dev/pkg/client/injection/kube/informers/core/v1/service" @@ -46,13 +44,24 @@ import ( "knative.dev/pkg/logging" knativeReconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/system" + + "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka/pkg/channel/consolidated/status" + kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1" + kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client" + "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" + kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" ) const ( - channelLabelKey = "messaging.knative.dev/channel" - channelLabelValue = "kafka-channel" - roleLabelKey = "messaging.knative.dev/role" - roleLabelValue = "dispatcher" + channelLabelKey = "messaging.knative.dev/channel" + channelLabelValue = "kafka-channel" + roleLabelKey = "messaging.knative.dev/role" + dispatcherRoleLabelValue = "dispatcher" + controllerRoleLabelValue = "controller" + interval = 100 * time.Millisecond + timeout = 5 * time.Minute ) // NewController initializes the controller and is called by the generated code. @@ -63,7 +72,7 @@ func NewController( ) *controller.Impl { logger := logging.FromContext(ctx) kafkaChannelInformer := kafkachannel.Get(ctx) - deploymentInformer := deployment.Get(ctx) + deploymentInformer := deploymentinformer.Get(ctx) endpointsInformer := endpointsinformer.Get(ctx) serviceAccountInformer := serviceaccount.Get(ctx) roleBindingInformer := rolebinding.Get(ctx) @@ -92,6 +101,13 @@ func NewController( r.dispatcherImage = env.Image r.dispatcherServiceAccount = env.DispatcherServiceAccount + // get the ref of the controller deployment + ownerRef, err := getControllerOwnerRef(ctx) + if err != nil { + logger.Fatalw("Could not determine the proper owner reference for the dispatcher deployment.", zap.Error(err)) + } + r.controllerRef = *ownerRef + impl := kafkaChannelReconciler.NewImpl(ctx, r) statusProber := status.NewProber( @@ -118,7 +134,7 @@ func NewController( } // Get and Watch the Kakfa config map and dynamically update Kafka configuration. - err := commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, handleKafkaConfigMapChange, system.Namespace()) + err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, handleKafkaConfigMapChange, system.Namespace()) if err != nil { logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err)) } @@ -155,7 +171,7 @@ func NewController( podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: knativeReconciler.ChainFilterFuncs( knativeReconciler.LabelFilterFunc(channelLabelKey, channelLabelValue, false), - knativeReconciler.LabelFilterFunc(roleLabelKey, roleLabelValue, false), + knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false), ), Handler: cache.ResourceEventHandlerFuncs{ // Cancel probing when a Pod is deleted @@ -178,3 +194,38 @@ func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, } } } + +func getControllerOwnerRef(ctx context.Context) (*metav1.OwnerReference, error) { + logger := logging.FromContext(ctx) + ctrlDeploymentLabels := labels.Set{ + channelLabelKey: channelLabelValue, + roleLabelKey: controllerRoleLabelValue, + } + + ownerRef := metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Controller: pointer.BoolPtr(true), + } + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + k8sClient := kubeclient.Get(ctx) + deploymentList, err := k8sClient.AppsV1().Deployments(system.Namespace()).List(ctx, metav1.ListOptions{LabelSelector: ctrlDeploymentLabels.String()}) + if err != nil { + return true, fmt.Errorf("error listing KafkaChannel controller deployment labels %w", err) + } else if len(deploymentList.Items) == 0 { + // Simple exponential backoff + logger.Debugw("found zero KafkaChannel controller deployment matching labels. Retrying.", zap.String("namespace", system.Namespace()), zap.Any("selectors", ctrlDeploymentLabels.AsSelector())) + return false, nil + } else if len(deploymentList.Items) > 1 { + return true, fmt.Errorf("found an unexpected number of KafkaChannel controller deployment matching labels. Got: %d, Want: 1", len(deploymentList.Items)) + } + d := deploymentList.Items[0] + ownerRef.Name = d.Name + ownerRef.UID = d.UID + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to determine the deployment of the KafkaChannel controller based on labels. %w", err) + } + return &ownerRef, nil +} diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go index 33e90cd717..40470a604f 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/Shopify/sarama" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -34,7 +35,15 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" - "k8s.io/utils/pointer" + v1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" + eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" + "knative.dev/pkg/apis" + "knative.dev/pkg/apis/duck" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/network" + pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources" @@ -47,16 +56,6 @@ import ( "knative.dev/eventing-kafka/pkg/common/client" commonconfig "knative.dev/eventing-kafka/pkg/common/config" "knative.dev/eventing-kafka/pkg/common/constants" - commonconstants "knative.dev/eventing-kafka/pkg/common/constants" - v1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/eventing/pkg/apis/eventing" - eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" - "knative.dev/pkg/apis" - "knative.dev/pkg/apis/duck" - "knative.dev/pkg/controller" - "knative.dev/pkg/logging" - "knative.dev/pkg/network" - pkgreconciler "knative.dev/pkg/reconciler" ) const ( @@ -133,6 +132,7 @@ type Reconciler struct { serviceAccountLister corev1listers.ServiceAccountLister roleBindingLister rbacv1listers.RoleBindingLister statusManager status.Manager + controllerRef metav1.OwnerReference } type envConfig struct { @@ -284,6 +284,7 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.Kafka } func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, dispatcherNamespace string, kc *v1beta1.KafkaChannel) error { + logger := logging.FromContext(ctx) if scope == scopeNamespace { // Configure RBAC in namespace to access the configmaps sa, err := r.reconcileServiceAccount(ctx, dispatcherNamespace, kc) @@ -313,93 +314,50 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp Replicas: 1, ServiceAccount: r.dispatcherServiceAccount, ConfigMapHash: r.kafkaConfigMapHash, + OwnerRef: r.controllerRef, } - expected := resources.MakeDispatcher(args) + want := resources.NewDispatcherBuilder().WithArgs(&args).Build() d, err := r.deploymentLister.Deployments(dispatcherNamespace).Get(dispatcherName) if err != nil { if apierrs.IsNotFound(err) { - d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Create(ctx, expected, metav1.CreateOptions{}) + d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Create(ctx, want, metav1.CreateOptions{}) if err == nil { controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created") kc.Status.PropagateDispatcherStatus(&d.Status) return err } else { + logger.Errorw("error while creating dispatcher deployment", zap.Error(err), zap.String("namespace", dispatcherNamespace), zap.Any("deployment", want)) kc.Status.MarkDispatcherFailed(dispatcherDeploymentFailed, "Failed to create the dispatcher deployment: %v", err) return newDeploymentWarn(err) } } - - logging.FromContext(ctx).Errorw("Unable to get the dispatcher deployment", zap.Error(err)) + logger.Errorw("can't get dispatcher deployment", zap.Error(err), zap.String("namespace", dispatcherNamespace), + zap.String("dispatcher-name", dispatcherName)) kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err) return err } else { - existing := utils.FindContainer(d, resources.DispatcherContainerName) - if existing == nil { - logging.FromContext(ctx).Errorw("Container %s does not exist in existing dispatcher deployment. Updating the deployment", resources.DispatcherContainerName) - d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(ctx, expected, metav1.UpdateOptions{}) - if err == nil { - controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated") - kc.Status.PropagateDispatcherStatus(&d.Status) - return nil - } else { - kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err) - } - return newDeploymentWarn(err) - } - - expectedContainer := utils.FindContainer(expected, resources.DispatcherContainerName) - if expectedContainer == nil { - return fmt.Errorf("container %s does not exist in expected dispatcher deployment. Cannot check if the deployment needs an update", resources.DispatcherContainerName) - } - - expectedConfigMapHash := r.kafkaConfigMapHash - - needsUpdate := false - // do not touch the original deployment, deepcopy it - deploymentCopy := d.DeepCopy() - existingCopy := utils.FindContainer(deploymentCopy, resources.DispatcherContainerName) - - if existingCopy.Image != expectedContainer.Image { - logging.FromContext(ctx).Infof("Dispatcher deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) - existingCopy.Image = expectedContainer.Image - needsUpdate = true - } - - if *deploymentCopy.Spec.Replicas == 0 { - logging.FromContext(ctx).Infof("Dispatcher deployment has 0 replica. Scaling up deployment to 1 replica") - deploymentCopy.Spec.Replicas = pointer.Int32Ptr(1) - needsUpdate = true - } - - if deploymentCopy.Spec.Template.Annotations == nil { - logging.FromContext(ctx).Infof("Configmap hash is not set. Updating the dispatcher deployment.") - deploymentCopy.Spec.Template.Annotations = map[string]string{ - commonconstants.ConfigMapHashAnnotationKey: expectedConfigMapHash, - } - needsUpdate = true - } - - if deploymentCopy.Spec.Template.Annotations[commonconstants.ConfigMapHashAnnotationKey] != expectedConfigMapHash { - logging.FromContext(ctx).Infof("Configmap hash is changed. Updating the dispatcher deployment.") - deploymentCopy.Spec.Template.Annotations[commonconstants.ConfigMapHashAnnotationKey] = expectedConfigMapHash - needsUpdate = true + // scale up the dispatcher to 1, otherwise keep the existing number in case the user has scaled it up. + if *d.Spec.Replicas == 0 { + logger.Infof("Dispatcher deployment has 0 replica. Scaling up deployment to 1 replica") + args.Replicas = 1 + } else { + args.Replicas = *d.Spec.Replicas } + want = resources.NewDispatcherBuilderFromDeployment(d.DeepCopy()).WithArgs(&args).Build() - if needsUpdate { - deploymentCopy, err = r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(ctx, deploymentCopy, metav1.UpdateOptions{}) - if err == nil { - controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated") - kc.Status.PropagateDispatcherStatus(&deploymentCopy.Status) - return nil - } else { + if !equality.Semantic.DeepEqual(want.Spec, d.Spec) { + logger.Infof("Dispatcher deployment changed; reconciling:\n%s", cmp.Diff(want.Spec, d.Spec)) + if d, err = r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(ctx, want, metav1.UpdateOptions{}); err != nil { + logger.Errorw("error while updating dispatcher deployment", zap.Error(err), zap.String("namespace", dispatcherNamespace), zap.Any("deployment", want)) kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err) return newDeploymentWarn(err) + } else { + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated") } - } else { - kc.Status.PropagateDispatcherStatus(&d.Status) - return nil } + kc.Status.PropagateDispatcherStatus(&d.Status) + return nil } } diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go index d4e95ec686..42ce3c7594 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go @@ -607,7 +607,6 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) { reconcilertesting.WithKafkaFinalizer(finalizerName), reconcilertesting.WithKafkaChannelConfigReady(), reconcilertesting.WithKafkaChannelTopicReady(), - // reconcilekafkatesting.WithKafkaChannelDeploymentReady(), reconcilertesting.WithKafkaChannelServiceReady(), reconcilertesting.WithKafkaChannelEndpointsReady(), reconcilertesting.WithKafkaChannelChannelServiceReady(), @@ -834,13 +833,14 @@ func (ca *mockClusterAdmin) DeleteConsumerGroup(group string) error { var _ sarama.ClusterAdmin = (*mockClusterAdmin)(nil) func makeDeploymentWithParams(image string, replicas int32, configMapHash string) *appsv1.Deployment { - return resources.MakeDispatcher(resources.DispatcherArgs{ + args := resources.DispatcherArgs{ DispatcherNamespace: testNS, Image: image, Replicas: replicas, ServiceAccount: testDispatcherserviceAccount, ConfigMapHash: configMapHash, - }) + } + return resources.NewDispatcherBuilder().WithArgs(&args).Build() } func makeDeploymentWithImageAndReplicas(image string, replicas int32) *appsv1.Deployment { diff --git a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher.go b/pkg/channel/consolidated/reconciler/controller/resources/dispatcher.go index 9dcb60989d..5686d9183a 100644 --- a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher.go +++ b/pkg/channel/consolidated/reconciler/controller/resources/dispatcher.go @@ -20,9 +20,10 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/system" + "knative.dev/eventing-kafka/pkg/common/constants" commonconstants "knative.dev/eventing-kafka/pkg/common/constants" - "knative.dev/pkg/system" ) const ( @@ -37,6 +38,11 @@ var ( } ) +type DispatcherBuilder struct { + deployment *v1.Deployment + args *DispatcherArgs +} + type DispatcherArgs struct { DispatcherScope string DispatcherNamespace string @@ -44,11 +50,53 @@ type DispatcherArgs struct { Replicas int32 ServiceAccount string ConfigMapHash string + OwnerRef metav1.OwnerReference +} + +// NewDispatcherBuilder returns a builder which builds from scratch a dispatcher deployment. +// Intended to be used when creating the dispatcher deployment for the first time. +func NewDispatcherBuilder() *DispatcherBuilder { + b := &DispatcherBuilder{} + b.deployment = dispatcherTemplate() + return b +} + +// NewDispatcherBuilderFromDeployment returns a builder which builds a dispatcher deployment from the given deployment. +// Intended to be used when updating an existing dispatcher deployment. +func NewDispatcherBuilderFromDeployment(d *v1.Deployment) *DispatcherBuilder { + b := &DispatcherBuilder{} + b.deployment = d + return b } -// MakeDispatcher generates the dispatcher deployment for the KafKa channel -func MakeDispatcher(args DispatcherArgs) *v1.Deployment { - replicas := args.Replicas +func (b *DispatcherBuilder) WithArgs(args *DispatcherArgs) *DispatcherBuilder { + b.args = args + return b +} + +func (b *DispatcherBuilder) Build() *v1.Deployment { + replicas := b.args.Replicas + b.deployment.ObjectMeta.Namespace = b.args.DispatcherNamespace + b.deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{b.args.OwnerRef} + b.deployment.Spec.Replicas = &replicas + b.deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{ + commonconstants.ConfigMapHashAnnotationKey: b.args.ConfigMapHash, + } + b.deployment.Spec.Template.Spec.ServiceAccountName = b.args.ServiceAccount + + for i, c := range b.deployment.Spec.Template.Spec.Containers { + if c.Name == DispatcherContainerName { + container := &b.deployment.Spec.Template.Spec.Containers[i] + container.Image = b.args.Image + if container.Env == nil { + container.Env = makeEnv(b.args) + } + } + } + return b.deployment +} + +func dispatcherTemplate() *v1.Deployment { return &v1.Deployment{ TypeMeta: metav1.TypeMeta{ @@ -56,28 +104,20 @@ func MakeDispatcher(args DispatcherArgs) *v1.Deployment { Kind: "Deployments", }, ObjectMeta: metav1.ObjectMeta{ - Name: dispatcherName, - Namespace: args.DispatcherNamespace, + Name: dispatcherName, }, Spec: v1.DeploymentSpec{ - Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: dispatcherLabels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: dispatcherLabels, - Annotations: map[string]string{ - commonconstants.ConfigMapHashAnnotationKey: args.ConfigMapHash, - }, }, Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccount, Containers: []corev1.Container{ { - Name: DispatcherContainerName, - Image: args.Image, - Env: makeEnv(args), + Name: DispatcherContainerName, Ports: []corev1.ContainerPort{{ Name: "metrics", ContainerPort: 9090, @@ -108,7 +148,7 @@ func MakeDispatcher(args DispatcherArgs) *v1.Deployment { } } -func makeEnv(args DispatcherArgs) []corev1.EnvVar { +func makeEnv(args *DispatcherArgs) []corev1.EnvVar { vars := []corev1.EnvVar{{ Name: system.NamespaceEnvKey, Value: system.Namespace(), @@ -131,18 +171,20 @@ func makeEnv(args DispatcherArgs) []corev1.EnvVar { FieldPath: "metadata.namespace", }, }, - }, corev1.EnvVar{ - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, corev1.EnvVar{ - Name: "CONTAINER_NAME", - Value: "dispatcher", }) } + vars = append(vars, corev1.EnvVar{ + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + APIVersion: "", + FieldPath: "metadata.name", + }, + }, + }, corev1.EnvVar{ + Name: "CONTAINER_NAME", + Value: "dispatcher", + }) return vars } diff --git a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_service_test.go b/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_service_test.go deleted file mode 100644 index 729e27f8f8..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_service_test.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" -) - -func TestNewDispatcherService(t *testing.T) { - want := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: dispatcherName, - Namespace: testNS, - Labels: dispatcherLabels, - }, - Spec: corev1.ServiceSpec{ - Selector: dispatcherLabels, - Ports: []corev1.ServicePort{ - { - Name: "http-dispatcher", - Protocol: corev1.ProtocolTCP, - Port: 80, - TargetPort: intstr.IntOrString{IntVal: 8080}, - }, - }, - }, - } - - got := MakeDispatcherService(testNS) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_test.go b/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_test.go deleted file mode 100644 index 7cb872d300..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/resources/dispatcher_test.go +++ /dev/null @@ -1,225 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "os" - "testing" - - "github.com/google/go-cmp/cmp" - v1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - commonconstants "knative.dev/eventing-kafka/pkg/common/constants" - "knative.dev/pkg/system" -) - -const ( - imageName = "my-test-image" - serviceAccount = "kafka-ch-dispatcher" -) - -func TestNewDispatcher(t *testing.T) { - os.Setenv(system.NamespaceEnvKey, "knative-testing") - - args := DispatcherArgs{ - DispatcherScope: "cluster", - DispatcherNamespace: testNS, - Image: imageName, - Replicas: 1, - ServiceAccount: serviceAccount, - ConfigMapHash: testConfigMapHash, - } - - replicas := int32(1) - want := &v1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployments", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNS, - Name: dispatcherName, - }, - Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: dispatcherLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: dispatcherLabels, - Annotations: map[string]string{ - commonconstants.ConfigMapHashAnnotationKey: testConfigMapHash, - }, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: serviceAccount, - Containers: []corev1.Container{ - { - Name: "dispatcher", - Image: imageName, - Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: "knative-testing", - }, { - Name: "METRICS_DOMAIN", - Value: "knative.dev/eventing", - }, { - Name: "CONFIG_LOGGING_NAME", - Value: "config-logging", - }, { - Name: "CONFIG_LEADERELECTION_NAME", - Value: "config-leader-election", - }}, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "config-kafka", - MountPath: "/etc/config-kafka", - }, - }, - }}, - Volumes: []corev1.Volume{ - { - Name: "config-kafka", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "config-kafka", - }, - }, - }, - }}, - }, - }, - }, - } - - got := MakeDispatcher(args) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} - -func TestNewNamespaceDispatcher(t *testing.T) { - os.Setenv(system.NamespaceEnvKey, "knative-testing") - - args := DispatcherArgs{ - DispatcherScope: "namespace", - DispatcherNamespace: testNS, - Image: imageName, - Replicas: 1, - ServiceAccount: serviceAccount, - ConfigMapHash: testConfigMapHash, - } - - replicas := int32(1) - want := &v1.Deployment{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "Deployments", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNS, - Name: dispatcherName, - }, - Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: dispatcherLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: dispatcherLabels, - Annotations: map[string]string{ - commonconstants.ConfigMapHashAnnotationKey: testConfigMapHash, - }, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: serviceAccount, - Containers: []corev1.Container{ - { - Name: "dispatcher", - Image: imageName, - Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: "knative-testing", - }, { - Name: "METRICS_DOMAIN", - Value: "knative.dev/eventing", - }, { - Name: "CONFIG_LOGGING_NAME", - Value: "config-logging", - }, { - Name: "CONFIG_LEADERELECTION_NAME", - Value: "config-leader-election", - }, { - Name: "NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, - }, { - Name: "CONTAINER_NAME", - Value: "dispatcher", - }}, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "config-kafka", - MountPath: "/etc/config-kafka", - }, - }, - }}, - Volumes: []corev1.Volume{ - { - Name: "config-kafka", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: "config-kafka", - }, - }, - }, - }}, - }, - }, - }, - } - - got := MakeDispatcher(args) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/resources/role_binding_test.go b/pkg/channel/consolidated/reconciler/controller/resources/role_binding_test.go deleted file mode 100644 index 792fe9b3c4..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/resources/role_binding_test.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - rbacv1 "k8s.io/api/rbac/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -const ( - rbName = "my-test-role-binding" - crName = "my-test-cluster-role" -) - -func TestNewRoleBinding(t *testing.T) { - want := &rbacv1.RoleBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: rbName, - Namespace: testNS, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: crName, - }, - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Namespace: testNS, - Name: serviceAccount, - }, - }, - } - - sa := MakeServiceAccount(testNS, serviceAccount) - got := MakeRoleBinding(testNS, rbName, sa, crName) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/resources/service_account_test.go b/pkg/channel/consolidated/reconciler/controller/resources/service_account_test.go deleted file mode 100644 index 84b4968fbb..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/resources/service_account_test.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestNewServiceAccount(t *testing.T) { - want := &corev1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testNS, - Name: serviceAccount, - }, - } - - got := MakeServiceAccount(testNS, serviceAccount) - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/resources/service_test.go b/pkg/channel/consolidated/reconciler/controller/resources/service_test.go deleted file mode 100644 index 940329a160..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/resources/service_test.go +++ /dev/null @@ -1,137 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -import ( - "errors" - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/pkg/kmeta" -) - -const ( - kcName = "my-test-kc" - testNS = "my-test-ns" - testDispatcherNS = "dispatcher-namespace" - testDispatcherName = "dispatcher-name" - testConfigMapHash = "deadbeef" -) - -func TestMakeChannelServiceAddress(t *testing.T) { - if want, got := "my-test-kc-kn-channel", MakeChannelServiceName(kcName); want != got { - t.Errorf("Want: %q got %q", want, got) - } -} - -func TestMakeService(t *testing.T) { - imc := &v1beta1.KafkaChannel{ - ObjectMeta: metav1.ObjectMeta{ - Name: kcName, - Namespace: testNS, - }, - } - want := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-kn-channel", kcName), - Namespace: testNS, - Labels: map[string]string{ - MessagingRoleLabel: MessagingRole, - }, - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(imc), - }, - }, - Spec: corev1.ServiceSpec{ - Ports: []corev1.ServicePort{ - { - Name: portName, - Protocol: corev1.ProtocolTCP, - Port: portNumber, - }, - }, - }, - } - - got, err := MakeK8sService(imc) - if err != nil { - t.Fatalf("Failed to create new service: %s", err) - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} - -func TestMakeServiceWithExternal(t *testing.T) { - imc := &v1beta1.KafkaChannel{ - ObjectMeta: metav1.ObjectMeta{ - Name: kcName, - Namespace: testNS, - }, - } - want := &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Service", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-kn-channel", kcName), - Namespace: testNS, - Labels: map[string]string{ - MessagingRoleLabel: MessagingRole, - }, - OwnerReferences: []metav1.OwnerReference{ - *kmeta.NewControllerRef(imc), - }, - }, - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeExternalName, - ExternalName: "dispatcher-name.dispatcher-namespace.svc.cluster.local", - }, - } - - got, err := MakeK8sService(imc, ExternalService(testDispatcherNS, testDispatcherName)) - if err != nil { - t.Fatalf("Failed to create new service: %s", err) - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected condition (-want, +got) = %v", diff) - } -} - -func TestMakeServiceWithFailingOption(t *testing.T) { - imc := &v1beta1.KafkaChannel{ - ObjectMeta: metav1.ObjectMeta{ - Name: kcName, - Namespace: testNS, - }, - } - _, err := MakeK8sService(imc, func(svc *corev1.Service) error { return errors.New("test-induced failure") }) - if err == nil { - t.Fatalf("Expcted error from new service but got none") - } -}