Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
[Consolidated KafkaChannel] Set KafkaChannel consolidated dispatcher …
Browse files Browse the repository at this point in the history
…OwenrRef (#798) (#803)

* Set KafkaChannel consolidated dispatcher OwenrRef

Signed-off-by: Ahmed Abdalla <[email protected]>

* Remove the rest of superficial tests

Signed-off-by: Ahmed Abdalla <[email protected]>

* Use a k8s go-client to fetch controller deployment

Signed-off-by: Ahmed Abdalla <[email protected]>

* Refactor dispatcher reconciliation logic for easier reasoning

Signed-off-by: Ahmed Abdalla <[email protected]>

* Remove apps.kubernetes.io labels

* fix bad comment

* Use apimachinary wait package instead of manual backoff

* fix typo
  • Loading branch information
devguyio authored Aug 11, 2021
1 parent 1fc8e75 commit 7eee107
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 740 deletions.
1 change: 0 additions & 1 deletion config/channel/consolidated/500-dispatcher.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions config/channel/consolidated/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ metadata:
name: kafka-ch-controller
namespace: knative-eventing
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: controller
kafka.eventing.knative.dev/release: devel
spec:
replicas: 1
Expand Down
103 changes: 0 additions & 103 deletions config/channel/consolidated/deployments/dispatcher.yaml

This file was deleted.

83 changes: 67 additions & 16 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@ package controller

import (
"context"
"fmt"
"time"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingClient "knative.dev/eventing/pkg/client/injection/client"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
"knative.dev/pkg/client/injection/kube/informers/core/v1/service"
Expand All @@ -46,13 +44,24 @@ import (
"knative.dev/pkg/logging"
knativeReconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
commonconfig "knative.dev/eventing-kafka/pkg/common/config"
)

const (
channelLabelKey = "messaging.knative.dev/channel"
channelLabelValue = "kafka-channel"
roleLabelKey = "messaging.knative.dev/role"
roleLabelValue = "dispatcher"
channelLabelKey = "messaging.knative.dev/channel"
channelLabelValue = "kafka-channel"
roleLabelKey = "messaging.knative.dev/role"
dispatcherRoleLabelValue = "dispatcher"
controllerRoleLabelValue = "controller"
interval = 100 * time.Millisecond
timeout = 5 * time.Minute
)

// NewController initializes the controller and is called by the generated code.
Expand All @@ -63,7 +72,7 @@ func NewController(
) *controller.Impl {
logger := logging.FromContext(ctx)
kafkaChannelInformer := kafkachannel.Get(ctx)
deploymentInformer := deployment.Get(ctx)
deploymentInformer := deploymentinformer.Get(ctx)
endpointsInformer := endpointsinformer.Get(ctx)
serviceAccountInformer := serviceaccount.Get(ctx)
roleBindingInformer := rolebinding.Get(ctx)
Expand Down Expand Up @@ -92,6 +101,13 @@ func NewController(
r.dispatcherImage = env.Image
r.dispatcherServiceAccount = env.DispatcherServiceAccount

// get the ref of the controller deployment
ownerRef, err := getControllerOwnerRef(ctx)
if err != nil {
logger.Fatalw("Could not determine the proper owner reference for the dispatcher deployment.", zap.Error(err))
}
r.controllerRef = *ownerRef

impl := kafkaChannelReconciler.NewImpl(ctx, r)

statusProber := status.NewProber(
Expand All @@ -118,7 +134,7 @@ func NewController(
}

// Get and Watch the Kakfa config map and dynamically update Kafka configuration.
err := commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, handleKafkaConfigMapChange, system.Namespace())
err = commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, handleKafkaConfigMapChange, system.Namespace())
if err != nil {
logger.Fatal("Failed To Initialize ConfigMap Watcher", zap.Error(err))
}
Expand Down Expand Up @@ -155,7 +171,7 @@ func NewController(
podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: knativeReconciler.ChainFilterFuncs(
knativeReconciler.LabelFilterFunc(channelLabelKey, channelLabelValue, false),
knativeReconciler.LabelFilterFunc(roleLabelKey, roleLabelValue, false),
knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false),
),
Handler: cache.ResourceEventHandlerFuncs{
// Cancel probing when a Pod is deleted
Expand All @@ -178,3 +194,38 @@ func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger,
}
}
}

func getControllerOwnerRef(ctx context.Context) (*metav1.OwnerReference, error) {
logger := logging.FromContext(ctx)
ctrlDeploymentLabels := labels.Set{
channelLabelKey: channelLabelValue,
roleLabelKey: controllerRoleLabelValue,
}

ownerRef := metav1.OwnerReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Controller: pointer.BoolPtr(true),
}
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
k8sClient := kubeclient.Get(ctx)
deploymentList, err := k8sClient.AppsV1().Deployments(system.Namespace()).List(ctx, metav1.ListOptions{LabelSelector: ctrlDeploymentLabels.String()})
if err != nil {
return true, fmt.Errorf("error listing KafkaChannel controller deployment labels %w", err)
} else if len(deploymentList.Items) == 0 {
// Simple exponential backoff
logger.Debugw("found zero KafkaChannel controller deployment matching labels. Retrying.", zap.String("namespace", system.Namespace()), zap.Any("selectors", ctrlDeploymentLabels.AsSelector()))
return false, nil
} else if len(deploymentList.Items) > 1 {
return true, fmt.Errorf("found an unexpected number of KafkaChannel controller deployment matching labels. Got: %d, Want: 1", len(deploymentList.Items))
}
d := deploymentList.Items[0]
ownerRef.Name = d.Name
ownerRef.UID = d.UID
return true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to determine the deployment of the KafkaChannel controller based on labels. %w", err)
}
return &ownerRef, nil
}
Loading

0 comments on commit 7eee107

Please sign in to comment.