Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
add Kafka defaults, starting with KEDA annotations (#855) (#861)
Browse files Browse the repository at this point in the history
Co-authored-by: Steven DOng <[email protected]>

Co-authored-by: Steven DOng <[email protected]>
  • Loading branch information
lionelvillard and steven0711dong authored Sep 9, 2021
1 parent 9c718d5 commit 524d1b4
Show file tree
Hide file tree
Showing 18 changed files with 1,125 additions and 53 deletions.
34 changes: 20 additions & 14 deletions cmd/source/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,29 @@ import (
"context"
"os"

"knative.dev/eventing-kafka/pkg/apis/bindings"
"knative.dev/eventing-kafka/pkg/apis/sources"
"knative.dev/pkg/webhook/resourcesemantics/conversion"

"k8s.io/apimachinery/pkg/runtime/schema"

bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
"knative.dev/eventing-kafka/pkg/source/reconciler/source"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/certificates"
"knative.dev/pkg/webhook/psbinding"
"knative.dev/pkg/webhook/resourcesemantics"
"knative.dev/pkg/webhook/resourcesemantics/conversion"
"knative.dev/pkg/webhook/resourcesemantics/defaulting"
"knative.dev/pkg/webhook/resourcesemantics/validation"

"knative.dev/eventing-kafka/pkg/apis/bindings"
bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
"knative.dev/eventing-kafka/pkg/apis/sources"
kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
"knative.dev/eventing-kafka/pkg/source/reconciler/source"
)

const (
Expand All @@ -57,6 +58,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
var callbacks = map[schema.GroupVersionKind]validation.Callback{}

func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
// Decorate contexts with the current state of the config.
kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store"))
kafkaStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return kafkaStore.ToContext(ctx)
}

return defaulting.NewAdmissionController(ctx,

// Name of the resource webhook.
Expand All @@ -69,11 +79,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
// (e.g. attach a store with configmap data)
return ctx
},
ctxFunc,

// Whether to disallow unknown fields.
true,
Expand Down
31 changes: 20 additions & 11 deletions cmd/source/mtcontroller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,31 @@ import (
"context"
"os"

"knative.dev/eventing-kafka/pkg/apis/bindings"
"knative.dev/eventing-kafka/pkg/apis/sources"
"knative.dev/pkg/webhook/resourcesemantics/conversion"

"k8s.io/apimachinery/pkg/runtime/schema"

bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka/pkg/source/reconciler/binding"
source "knative.dev/eventing-kafka/pkg/source/reconciler/mtsource"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/certificates"
"knative.dev/pkg/webhook/psbinding"
"knative.dev/pkg/webhook/resourcesemantics"
"knative.dev/pkg/webhook/resourcesemantics/defaulting"
"knative.dev/pkg/webhook/resourcesemantics/validation"

"knative.dev/eventing-kafka/pkg/apis/bindings"
bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
"knative.dev/eventing-kafka/pkg/apis/sources"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka/pkg/source/reconciler/binding"

kafkasourcedefaultconfig "knative.dev/eventing-kafka/pkg/apis/sources/config"
)

const (
Expand All @@ -57,6 +60,15 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
var callbacks = map[schema.GroupVersionKind]validation.Callback{}

func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
// Decorate contexts with the current state of the config.
kafkaStore := kafkasourcedefaultconfig.NewStore(logging.FromContext(ctx).Named("kafka-source-config-store"))
kafkaStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return kafkaStore.ToContext(ctx)
}

return defaulting.NewAdmissionController(ctx,

// Name of the resource webhook.
Expand All @@ -69,11 +81,7 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
// (e.g. attach a store with configmap data)
return ctx
},
ctxFunc,

// Whether to disallow unknown fields.
true,
Expand All @@ -92,6 +100,7 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher
// The resources to validate.
types,

// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
// A function that infuses the context passed to Validate/SetDefaults with custom metadata.
func(ctx context.Context) context.Context {
// Here is where you would infuse the context with state
Expand Down
58 changes: 58 additions & 0 deletions config/source/common/configmaps/config-kafka-source-defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-source-defaults
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
annotations:
knative.dev/example-checksum: "b6ed351d"
data:
_example: |
################################
# #
# EXAMPLE CONFIGURATION #
# #
################################
# This block is not actually functional configuration,
# but serves to illustrate the available configuration
# options and document them in a way that is accessible
# to users that `kubectl edit` this config map.
#
# These sample configuration options may be copied out of
# this example block and unindented to be in the data block
# to actually change the configuration.
# autoscalingClass is the autoscaler class name to use.
# valid value: keda.autoscaling.knative.dev
# autoscalingClass: ""
# minScale is the minimum number of replicas to scale down to.
# minScale: "1"
# maxScale is the maximum number of replicas to scale up to.
# maxScale: "1"
# pollingInterval is the interval in seconds KEDA uses to poll metrics.
# pollingInterval: "30"
# cooldownPeriod is the period of time in seconds KEDA waits until it scales down.
# cooldownPeriod: "300"
# kafkaLagThreshold is the lag (ie. number of messages in a partition) threshold for KEDA to scale up sources.
# kafkaLagThreshold: "10"
1 change: 1 addition & 0 deletions config/source/multi/400-config-kafka-source-defaults.yaml
1 change: 1 addition & 0 deletions config/source/single/400-config-kafka-source-defaults.yaml
1 change: 1 addition & 0 deletions hack/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tools

import (
_ "knative.dev/hack"
_ "knative.dev/pkg/configmap/hash-gen"
_ "knative.dev/pkg/hack"

// Test images from eventing
Expand Down
29 changes: 29 additions & 0 deletions hack/update-checksums.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash

# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -o errexit
set -o nounset
set -o pipefail

export GO111MODULE=on

if [ -z "${GOPATH:-}" ]; then
export GOPATH=$(go env GOPATH)
fi

source $(dirname $0)/../vendor/knative.dev/hack/library.sh

go run "${REPO_ROOT_DIR}/vendor/knative.dev/pkg/configmap/hash-gen" "${REPO_ROOT_DIR}"/config/source/common/configmaps/*.yaml
21 changes: 21 additions & 0 deletions pkg/apis/sources/config/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// +k8s:deepcopy-gen=package

// Package config holds the typed objects that define the schemas for
// ConfigMap objects that pertain to our API objects.
package config
Loading

0 comments on commit 524d1b4

Please sign in to comment.