From c16c41d1308a0bf36c453a50e6c75b2329fadcd7 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 15:39:49 +0200 Subject: [PATCH 01/12] Add hook that is called before application is started --- streams-bootstrap-cli/build.gradle.kts | 5 ++--- .../com/bakdata/kafka/KafkaApplication.java | 17 ++++------------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index da3fc86c..15548118 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -7,9 +7,6 @@ plugins { dependencies { api(project(":streams-bootstrap-core")) api(group = "info.picocli", name = "picocli", version = "4.7.5") - val log4jVersion: String by project - implementation(group = "org.apache.logging.log4j", name = "log4j-core", version = log4jVersion) - implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) val junitVersion: String by project testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) @@ -31,4 +28,6 @@ dependencies { name = "schema-registry-mock-junit5", version = fluentKafkaVersion ) + val log4jVersion: String by project + testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index e0a1f84c..b76bd3e1 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -42,8 +42,6 @@ import lombok.Setter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.ParseResult; @@ -56,7 +54,6 @@ *
  • {@link #outputTopic}
  • *
  • {@link #extraOutputTopics}
  • *
  • {@link #brokers}
  • - *
  • {@link #debug}
  • *
  • {@link #schemaRegistryUrl}
  • *
  • {@link #kafkaConfig}
  • * @@ -92,8 +89,6 @@ public abstract class KafkaApplication extraOutputTopics = emptyMap(); @CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to") private String brokers; - @CommandLine.Option(names = "--debug", arity = "0..1", description = "Configure logging to debug") - private boolean debug; @CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry") private String schemaRegistryUrl; @CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties") @@ -299,20 +294,16 @@ public final CleanableApp createCleanableApp() { protected abstract CA createConfiguredApp(final A app, AppConfiguration configuration); /** - * Configure application when running in debug mode. By default, Log4j2 log level is configured to debug for - * {@code com.bakdata} and the applications package. + * Called before starting the application, e.g., invoking {@link #run()} */ - protected void configureDebug() { - Configurator.setLevel("com.bakdata", Level.DEBUG); - Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG); + protected void onApplicationStart() { + // do nothing by default } private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + this.onApplicationStart(); log.info("Starting application"); - if (this.debug) { - this.configureDebug(); - } log.debug("Starting application: {}", this); } From 8f54ba541b1ebbf2ff1c4b96bc2aaae2a95aa4c7 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 16:51:15 +0200 Subject: [PATCH 02/12] Update --- README.md | 4 ---- charts/producer-app-cleanup-job/templates/job.yaml | 4 ---- charts/producer-app-cleanup-job/values.yaml | 2 -- charts/producer-app/README.md | 1 - charts/producer-app/templates/pod.yaml | 4 ---- charts/producer-app/values.yaml | 2 -- charts/streams-app-cleanup-job/templates/job.yaml | 4 ---- charts/streams-app-cleanup-job/values.yaml | 2 -- charts/streams-app/README.md | 1 - charts/streams-app/templates/deployment.yaml | 4 ---- charts/streams-app/values.yaml | 2 -- 11 files changed, 30 deletions(-) diff --git a/README.md b/README.md index 91688f9d..6dd05694 100644 --- a/README.md +++ b/README.md @@ -125,8 +125,6 @@ The following configuration options are available: - `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown. -- `--debug`: Configure logging to debug - Additionally, the following commands are available: - `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate @@ -188,8 +186,6 @@ The following configuration options are available: - `--extra-output-topics`: Additional named output topics (`String=String>[,...]`) -- `--debug`: Configure logging to debug - Additionally, the following commands are available: - `clean`: Delete all output topics associated with the Kafka Producer application. diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index 65f11c2d..cdc999e9 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -72,10 +72,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values "debug" }} - - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" - value: {{ .Values.debug | quote }} - {{- end }} {{- if hasKey .Values.streams "outputTopic" }} - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.streams.outputTopic | quote }} diff --git a/charts/producer-app-cleanup-job/values.yaml b/charts/producer-app-cleanup-job/values.yaml index e0d956f7..10ba2322 100644 --- a/charts/producer-app-cleanup-job/values.yaml +++ b/charts/producer-app-cleanup-job/values.yaml @@ -29,8 +29,6 @@ streams: commandLine: {} # MY_CLI_PARAM: "foo-bar" -debug: false - env: {} # MY_ENV_VARIABLE: foo-bar diff --git a/charts/producer-app/README.md b/charts/producer-app/README.md index 2366b8db..d6a4574b 100644 --- a/charts/producer-app/README.md +++ b/charts/producer-app/README.md @@ -60,7 +60,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | | `commandLine` | Map of command line arguments passed to the producer app. | `{}` | -| `debug` | Configure logging to debug | `false` | | `env` | Custom environment variables | `{}` | | `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | | `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index 772a716a..d697ee5b 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -56,10 +56,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values "debug" }} - - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" - value: {{ .Values.debug | quote }} - {{- end }} {{- if hasKey .Values.streams "outputTopic" }} - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.streams.outputTopic | quote }} diff --git a/charts/producer-app/values.yaml b/charts/producer-app/values.yaml index 4f7480ab..d09628ad 100644 --- a/charts/producer-app/values.yaml +++ b/charts/producer-app/values.yaml @@ -62,8 +62,6 @@ streams: commandLine: {} # MY_CLI_PARAM: "foo-bar" -debug: false - env: {} # MY_ENV_VARIABLE: foo-bar diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 7b19f207..fd8ec5fa 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -76,10 +76,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values "debug" }} - - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" - value: {{ .Values.debug | quote }} - {{- end }} {{- if and (hasKey .Values.streams "inputTopics") (.Values.streams.inputTopics) }} - name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS" value: {{ .Values.streams.inputTopics | join "," | quote }} diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index b3464e2a..029bb730 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -41,8 +41,6 @@ streams: commandLine: {} # MY_CLI_PARAM: "foo-bar" -debug: false - env: {} # MY_ENV_VARIABLE: foo-bar # diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index 77fa5803..cd4c6795 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -71,7 +71,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p | ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | | `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | | `commandLine` | Map of command line arguments passed to the streams app. | `{}` | -| `debug` | Configure logging to debug | `false` | | `env` | Custom environment variables | `{}` | | `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | | `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index c13b080c..77e5ac7b 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -123,10 +123,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values "debug" }} - - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" - value: {{ .Values.debug | quote }} - {{- end }} {{- if and (hasKey .Values.streams "inputTopics") (.Values.streams.inputTopics) }} - name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS" value: {{ .Values.streams.inputTopics | join "," | quote }} diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index b3627e9d..ee6b8681 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -49,8 +49,6 @@ streams: commandLine: {} # MY_CLI_PARAM: "foo-bar" -debug: false - env: {} # MY_ENV_VARIABLE: foo-bar From a17d78674e109027b2be9a54c10aae834ad157dc Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 16:52:21 +0200 Subject: [PATCH 03/12] Remove dependency on Avro --- streams-bootstrap-core/build.gradle.kts | 3 +- .../bakdata/kafka/ConfiguredProducerApp.java | 33 ++++++--------- .../bakdata/kafka/ConfiguredStreamsApp.java | 31 ++++++-------- .../java/com/bakdata/kafka/ProducerApp.java | 6 +++ .../java/com/bakdata/kafka/SerdeConfig.java | 40 +++++++++++++++++++ .../com/bakdata/kafka/SerializerConfig.java | 40 +++++++++++++++++++ .../java/com/bakdata/kafka/StreamsApp.java | 6 +++ 7 files changed, 118 insertions(+), 41 deletions(-) create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 4bd028b1..bdb53778 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion) api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion) val confluentVersion: String by project - implementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) + implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) api( group = "org.slf4j", @@ -43,6 +43,7 @@ dependencies { testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index de2f409d..880af577 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -24,16 +24,12 @@ package com.bakdata.kafka; -import static java.util.Collections.emptyMap; - -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.HashMap; import java.util.Map; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; /** * A {@link ProducerApp} with a corresponding {@link AppConfiguration} @@ -45,17 +41,9 @@ public class ConfiguredProducerApp implements ConfiguredA private final @NonNull T app; private final @NonNull AppConfiguration configuration; - private static Map createBaseConfig(final KafkaEndpointConfig endpointConfig) { + private static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); - if (endpointConfig.isSchemaRegistryConfigured()) { - kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); - kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); - } else { - kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - } - kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all"); @@ -70,12 +58,6 @@ private static Map createBaseConfig(final KafkaEndpointConfig en * Configuration is created in the following order *
      *
    • - * {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and - * {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} are configured based on - * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. - * If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise - * {@link StringSerializer} is used. - * Additionally, the following is configured: *
            * max.in.flight.requests.per.connection=1
            * acks=all
      @@ -95,6 +77,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en
            *     
    • * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} *
    • + *
    • + * {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and + * {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} is configured using + * {@link ProducerApp#defaultSerializerConfig()} + *
    • *
    * * @param endpointConfig endpoint to run app on @@ -102,7 +89,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en */ public Map getKafkaProperties(final KafkaEndpointConfig endpointConfig) { final KafkaPropertiesFactory propertiesFactory = this.createPropertiesFactory(endpointConfig); - return propertiesFactory.createKafkaProperties(emptyMap()); + final SerializerConfig serializerConfig = this.app.defaultSerializerConfig(); + return propertiesFactory.createKafkaProperties(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerConfig.getKeySerializer(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerConfig.getValueSerializer() + )); } /** @@ -130,7 +121,7 @@ public void close() { } private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) { - final Map baseConfig = createBaseConfig(endpointConfig); + final Map baseConfig = createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index efe0a3d2..7eb75d08 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -32,7 +31,6 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -46,17 +44,9 @@ public class ConfiguredStreamsApp implements ConfiguredApp private final @NonNull T app; private final @NonNull AppConfiguration configuration; - private static Map createBaseConfig(final KafkaEndpointConfig endpointConfig) { + private static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); - if (endpointConfig.isSchemaRegistryConfigured()) { - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - } else { - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); - } - // exactly once and order kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); @@ -74,12 +64,7 @@ private static Map createBaseConfig(final KafkaEndpointConfig en * Configuration is created in the following order *
      *
    • - * {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG} are configured based on - * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. - * If Schema Registry is configured, {@link SpecificAvroSerde} is used, otherwise {@link StringSerde} is - * used. - * Additionally, exactly-once, in-order, and compression are configured: + * Exactly-once, in-order, and compression are configured: *
            * processing.guarantee=exactly_once_v2
            * producer.max.in.flight.requests.per.connection=1
      @@ -104,6 +89,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en
            *         {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using
            *         {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)}
            *     
    • + *
    • + * {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG} and + * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG} is configured using + * {@link StreamsApp#defaultSerdeConfig()} + *
    • *
    * * @param endpointConfig endpoint to run app on @@ -111,8 +101,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en */ public Map getKafkaProperties(final KafkaEndpointConfig endpointConfig) { final KafkaPropertiesFactory propertiesFactory = this.createPropertiesFactory(endpointConfig); + final SerdeConfig serdeConfig = this.app.defaultSerdeConfig(); return propertiesFactory.createKafkaProperties(Map.of( - StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId() + StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId(), + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdeConfig.getKeySerde(), + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdeConfig.getValueSerde() )); } @@ -169,7 +162,7 @@ public void close() { } private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) { - final Map baseConfig = createBaseConfig(endpointConfig); + final Map baseConfig = createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java index 42cee6e6..e5a0f7f0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import org.apache.kafka.common.serialization.StringSerializer; + /** * Application that defines how to produce messages to Kafka and necessary configurations */ @@ -46,4 +48,8 @@ default ProducerCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { return new ProducerCleanUpConfiguration(); } + + default SerializerConfig defaultSerializerConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java new file mode 100644 index 00000000..5fcd3fc2 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.common.serialization.Serde; + +@Getter +@RequiredArgsConstructor +@With +public class SerdeConfig { + + private final @NonNull Class keySerde; + private final @NonNull Class valueSerde; +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java new file mode 100644 index 00000000..635f069b --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.common.serialization.Serializer; + +@Getter +@RequiredArgsConstructor +@With +public class SerializerConfig { + + private final @NonNull Class keySerializer; + private final @NonNull Class valueSerializer; +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java index 0802720a..c4c83ff7 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import org.apache.kafka.common.serialization.Serdes.StringSerde; + /** * Application that defines a Kafka Streams {@link org.apache.kafka.streams.Topology} and necessary configurations */ @@ -55,4 +57,8 @@ default StreamsCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { return new StreamsCleanUpConfiguration(); } + + default SerdeConfig defaultSerdeConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } From a0d273ac5752e6ab353c3afd3450cf9305f804a5 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 20:00:59 +0200 Subject: [PATCH 04/12] Remove dependency on Avro --- README.md | 10 +++++ .../test/java/com/bakdata/kafka/CliTest.java | 40 +++++++++++++++++++ .../java/com/bakdata/kafka/CloseFlagApp.java | 6 +++ .../kafka/integration/RunProducerAppTest.java | 9 ++--- .../kafka/test_applications/Mirror.java | 7 ++++ .../kafka/test_applications/WordCount.java | 7 ++++ .../src/main/java/com/bakdata/kafka/App.java | 3 +- .../bakdata/kafka/ConfiguredProducerApp.java | 10 ++--- .../bakdata/kafka/ConfiguredStreamsApp.java | 15 +++---- .../bakdata/kafka/KafkaPropertiesFactory.java | 2 + .../java/com/bakdata/kafka/ProducerApp.java | 9 ++--- .../java/com/bakdata/kafka/SerdeConfig.java | 14 +++++-- .../bakdata/kafka/SerializationConfig.java | 33 +++++++++++++++ .../com/bakdata/kafka/SerializerConfig.java | 14 +++++-- .../java/com/bakdata/kafka/StreamsApp.java | 7 +--- .../kafka/ConfiguredProducerAppTest.java | 23 ++++------- .../kafka/ConfiguredStreamsAppTest.java | 25 ++++-------- .../kafka/ExecutableProducerAppTest.java | 6 +++ .../kafka/ExecutableStreamsAppTest.java | 6 +++ .../kafka/integration/StreamsRunnerTest.java | 11 ++++- .../test_applications/AvroKeyProducer.java | 10 ++--- .../test_applications/AvroValueProducer.java | 10 ++--- .../ComplexTopologyApplication.java | 10 ++--- .../test_applications/ExtraInputTopics.java | 7 ++++ .../kafka/test_applications/Mirror.java | 7 ++++ .../test_applications/MirrorKeyWithAvro.java | 10 ++--- .../MirrorValueWithAvro.java | 10 ++--- .../MirrorWithNonDefaultSerde.java | 10 ++--- .../test_applications/StringProducer.java | 7 ++++ .../kafka/test_applications/WordCount.java | 7 ++++ .../test_applications/WordCountPattern.java | 7 ++++ 31 files changed, 240 insertions(+), 112 deletions(-) create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java diff --git a/README.md b/README.md index 6dd05694..df8b8d73 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication { return "streams-bootstrap-app-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + // Optionally you can define custom Kafka properties @Override public Map createKafkaProperties() { @@ -162,6 +167,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication { }; } + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + // Optionally you can define custom Kafka properties @Override public Map createKafkaProperties() { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index cdede819..7f0c79b7 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -63,6 +63,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -91,6 +96,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }), new String[]{ "--brokers", "localhost:9092", "--schema-registry-url", "http://localhost:8081", @@ -115,6 +125,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -147,6 +162,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -179,6 +199,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "app"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } })) { kafkaCluster.start(); kafkaCluster.createTopic(TopicConfig.withName(input).build()); @@ -210,6 +235,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "app"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } })) { kafkaCluster.start(); kafkaCluster.createTopic(TopicConfig.withName(input).build()); @@ -249,6 +279,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } }, new String[]{ @@ -275,6 +310,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java index 6e928d43..640328eb 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java @@ -27,6 +27,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -57,6 +58,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return CloseFlagApp.this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + @Override public void close() { CloseFlagApp.this.appClosed = true; diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index d460658b..67232632 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -32,11 +32,13 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.Map; import java.util.concurrent.TimeUnit; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; @@ -44,7 +46,6 @@ import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -85,10 +86,8 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 212c7611..54aee24c 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index 2bcdc095..58f6d7af 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -31,6 +32,7 @@ import java.util.regex.Pattern; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java index 65ed48bb..42f0dae2 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java @@ -33,7 +33,6 @@ * @param type of topic config * @param type of clean up config */ -@FunctionalInterface public interface App extends AutoCloseable { /** @@ -63,4 +62,6 @@ default Map createKafkaProperties() { default void setup(final EffectiveAppConfiguration configuration) { // do nothing by default } + + SerializationConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index 880af577..60a07d86 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import static java.util.Collections.emptyMap; + import java.util.HashMap; import java.util.Map; import lombok.Getter; @@ -80,7 +82,7 @@ private static Map createBaseConfig() { *
  • * {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and * {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} is configured using - * {@link ProducerApp#defaultSerializerConfig()} + * {@link ProducerApp#defaultSerializationConfig()} *
  • * * @@ -89,11 +91,7 @@ private static Map createBaseConfig() { */ public Map getKafkaProperties(final KafkaEndpointConfig endpointConfig) { final KafkaPropertiesFactory propertiesFactory = this.createPropertiesFactory(endpointConfig); - final SerializerConfig serializerConfig = this.app.defaultSerializerConfig(); - return propertiesFactory.createKafkaProperties(Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerConfig.getKeySerializer(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerConfig.getValueSerializer() - )); + return propertiesFactory.createKafkaProperties(emptyMap()); } /** diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 7eb75d08..5d77aa40 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -86,13 +86,13 @@ private static Map createBaseConfig() { * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} * *
  • - * {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using - * {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)} - *
  • - *
  • * {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG} and * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG} is configured using - * {@link StreamsApp#defaultSerdeConfig()} + * {@link StreamsApp#defaultSerializationConfig()} + *
  • + *
  • + * {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using + * {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)} *
  • * * @@ -101,11 +101,8 @@ private static Map createBaseConfig() { */ public Map getKafkaProperties(final KafkaEndpointConfig endpointConfig) { final KafkaPropertiesFactory propertiesFactory = this.createPropertiesFactory(endpointConfig); - final SerdeConfig serdeConfig = this.app.defaultSerdeConfig(); return propertiesFactory.createKafkaProperties(Map.of( - StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId(), - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdeConfig.getKeySerde(), - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdeConfig.getValueSerde() + StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId() )); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index aecd5dc2..51d07e5d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -40,9 +40,11 @@ class KafkaPropertiesFactory { Map createKafkaProperties(final Map configOverrides) { final Map kafkaConfig = new HashMap<>(this.baseConfig); kafkaConfig.putAll(this.app.createKafkaProperties()); + final SerializationConfig serializationConfig = this.app.defaultSerializationConfig(); kafkaConfig.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); kafkaConfig.putAll(this.configuration.getKafkaConfig()); kafkaConfig.putAll(this.endpointConfig.createKafkaProperties()); + kafkaConfig.putAll(serializationConfig.createProperties()); kafkaConfig.putAll(configOverrides); return Collections.unmodifiableMap(kafkaConfig); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java index e5a0f7f0..ca8005dc 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -24,12 +24,9 @@ package com.bakdata.kafka; -import org.apache.kafka.common.serialization.StringSerializer; - /** * Application that defines how to produce messages to Kafka and necessary configurations */ -@FunctionalInterface public interface ProducerApp extends App { /** @@ -49,7 +46,7 @@ default ProducerCleanUpConfiguration setupCleanUp( return new ProducerCleanUpConfiguration(); } - default SerializerConfig defaultSerializerConfig() { - return new SerializerConfig(StringSerializer.class, StringSerializer.class); - } + @Override + @Override + SerializerConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java index 5fcd3fc2..35c5b115 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java @@ -24,17 +24,25 @@ package com.bakdata.kafka; -import lombok.Getter; +import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.With; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsConfig; -@Getter @RequiredArgsConstructor @With -public class SerdeConfig { +public class SerdeConfig implements SerializationConfig { private final @NonNull Class keySerde; private final @NonNull Class valueSerde; + + @Override + public Map createProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, this.keySerde, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, this.valueSerde + ); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java new file mode 100644 index 00000000..63390712 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java @@ -0,0 +1,33 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Map; + +@FunctionalInterface +public interface SerializationConfig { + + Map createProperties(); +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java index 635f069b..bf07484b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java @@ -24,17 +24,25 @@ package com.bakdata.kafka; -import lombok.Getter; +import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.With; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serializer; -@Getter @RequiredArgsConstructor @With -public class SerializerConfig { +public class SerializerConfig implements SerializationConfig { private final @NonNull Class keySerializer; private final @NonNull Class valueSerializer; + + @Override + public Map createProperties() { + return Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer + ); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java index c4c83ff7..9adde556 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -24,8 +24,6 @@ package com.bakdata.kafka; -import org.apache.kafka.common.serialization.Serdes.StringSerde; - /** * Application that defines a Kafka Streams {@link org.apache.kafka.streams.Topology} and necessary configurations */ @@ -58,7 +56,6 @@ default StreamsCleanUpConfiguration setupCleanUp( return new StreamsCleanUpConfiguration(); } - default SerdeConfig defaultSerdeConfig() { - return new SerdeConfig(StringSerde.class, StringSerde.class); - } + @Override + SerdeConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 68314c09..2f2c1613 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -28,8 +28,8 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.Map; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -76,7 +76,7 @@ void shouldPrioritizeEnvironmentConfigs() { } @Test - void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { + void shouldSetDefaultSerializer() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); @@ -84,20 +84,8 @@ void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { .brokers("fake") .schemaRegistryUrl("fake") .build())) - .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); - } - - @Test - void shouldSetDefaultStringSerializerWhenSchemaRegistryUrlIsNotSet() { - final AppConfiguration configuration = newAppConfiguration(); - final ConfiguredProducerApp configuredApp = - new ConfiguredProducerApp<>(new TestProducer(), configuration); - assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() - .brokers("fake") - .build())) .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); } private static class TestProducer implements ProducerApp { @@ -114,5 +102,10 @@ public Map createKafkaProperties() { "hello", "world" ); } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, LongSerializer.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index c9037bc4..4f1c9882 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -24,13 +24,12 @@ package com.bakdata.kafka; -import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.LongSerde; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -77,7 +76,7 @@ void shouldPrioritizeEnvironmentConfigs() { } @Test - void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { + void shouldSetDefaultSerde() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); @@ -85,21 +84,8 @@ void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { .brokers("fake") .schemaRegistryUrl("fake") .build())) - .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class) - .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class) - .containsEntry(SCHEMA_REGISTRY_URL_CONFIG, "fake"); - } - - @Test - void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() { - final AppConfiguration configuration = newAppConfiguration(); - final ConfiguredStreamsApp configuredApp = - new ConfiguredStreamsApp<>(new TestApplication(), configuration); - assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() - .brokers("fake") - .build())) .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class) - .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); + .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class); } private static class TestApplication implements StreamsApp { @@ -121,5 +107,10 @@ public Map createKafkaProperties() { "hello", "world" ); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, LongSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java index 03fbe575..61108d65 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -114,5 +115,10 @@ public ProducerCleanUpConfiguration setupCleanUp( public ProducerRunnable buildRunnable(final ProducerBuilder builder) { return () -> {}; } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java index c1ff3fbf..0fc4e58c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -124,5 +125,10 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "foo"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 555290e1..a65e729c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -31,6 +31,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsExecutionOptions; import com.bakdata.kafka.StreamsRunner; @@ -49,6 +50,7 @@ import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; @@ -92,8 +94,8 @@ static Thread run(final StreamsRunner runner) { static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { final AppConfiguration configuration = new AppConfiguration<>(topics, Map.of( - StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); return new ConfiguredStreamsApp<>(app, configuration); } @@ -250,5 +252,10 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java index 5848accc..739912f9 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java @@ -27,10 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; -import java.util.Map; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -46,9 +46,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(SpecificAvroSerializer.class, StringSerializer.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java index 64356ce2..1b373284 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java @@ -27,10 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; -import java.util.Map; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -46,9 +46,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java index d321e3ca..4f5b2680 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java @@ -24,17 +24,16 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.time.Duration; -import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -71,10 +70,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, SpecificAvroSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java index 88c7d0e3..41e192c1 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -42,4 +44,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 212c7611..54aee24c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java index 67056d93..6f303f2e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java @@ -24,15 +24,14 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -49,10 +48,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(SpecificAvroSerde.class, StringSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java index a2ee1aa8..fb8087d8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java @@ -24,15 +24,14 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -49,10 +48,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, SpecificAvroSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java index 846e851f..7406b484 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java @@ -25,16 +25,15 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; @@ -66,10 +65,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java index 10de4059..296bf8d0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java @@ -27,8 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; public class StringProducer implements ProducerApp { @Override @@ -39,4 +41,9 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } }; } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index 2bcdc095..58f6d7af 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -31,6 +32,7 @@ import java.util.regex.Pattern; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java index 9c86b0b8..16b8f739 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -32,6 +33,7 @@ import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -58,4 +60,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } From 4dc272e37db49eddddc52e6d37d3bdb5fb279fe2 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 20:20:03 +0200 Subject: [PATCH 05/12] Remove dependency on Avro --- .../src/main/java/com/bakdata/kafka/App.java | 4 ++++ .../main/java/com/bakdata/kafka/KafkaEndpointConfig.java | 9 +-------- .../src/main/java/com/bakdata/kafka/ProducerApp.java | 1 - .../src/main/java/com/bakdata/kafka/SerdeConfig.java | 3 +++ .../main/java/com/bakdata/kafka/SerializationConfig.java | 7 +++++++ .../main/java/com/bakdata/kafka/SerializerConfig.java | 3 +++ 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java index 42f0dae2..d83fb757 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java @@ -63,5 +63,9 @@ default void setup(final EffectiveAppConfiguration configuration) { // do nothing by default } + /** + * Configure default serialization behavior + * @return {@code SerializationConfig} + */ SerializationConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java index 5aa42876..b9552ac4 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java @@ -52,17 +52,10 @@ public class KafkaEndpointConfig { public Map createKafkaProperties() { final Map kafkaConfig = new HashMap<>(); kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokers); - if (this.isSchemaRegistryConfigured()) { + if (this.schemaRegistryUrl != null) { kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); } return Collections.unmodifiableMap(kafkaConfig); } - /** - * Check if schema registry has been configured - * @return true if {@link #schemaRegistryUrl} has been configured - */ - public boolean isSchemaRegistryConfigured() { - return this.schemaRegistryUrl != null; - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java index ca8005dc..e9113274 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -46,7 +46,6 @@ default ProducerCleanUpConfiguration setupCleanUp( return new ProducerCleanUpConfiguration(); } - @Override @Override SerializerConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java index 35c5b115..70e947d1 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java @@ -31,6 +31,9 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsConfig; +/** + * Defines how to (de-)serialize the data in a Kafka Streams app + */ @RequiredArgsConstructor @With public class SerdeConfig implements SerializationConfig { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java index 63390712..0c63b693 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java @@ -26,8 +26,15 @@ import java.util.Map; +/** + * Defines how to (de-)serialize the data in a Kafka client + */ @FunctionalInterface public interface SerializationConfig { + /** + * Create properties from this {@code SerializationConfig} + * @return Map of serialization configurations + */ Map createProperties(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java index bf07484b..13386621 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java @@ -31,6 +31,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serializer; +/** + * Defines how to serialize the data in a Kafka producer + */ @RequiredArgsConstructor @With public class SerializerConfig implements SerializationConfig { From 5958d8050648502d2e3044d7ecc3fa3792d1d03a Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Jul 2024 20:29:21 +0200 Subject: [PATCH 06/12] Remove dependency on Avro --- README.md | 12 ++++++++---- .../src/test/java/com/bakdata/kafka/CliTest.java | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index df8b8d73..ea027e22 100644 --- a/README.md +++ b/README.md @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil ```java import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; -public class StreamsBootstrapApplication extends KafkaStreamsApplication { +public class MyStreamsApplication extends KafkaStreamsApplication { public static void main(final String[] args) { - startApplication(new StreamsBootstrapApplication(), args); + startApplication(new MyStreamsApplication(), args); } @Override @@ -147,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import java.util.Map; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; -public class StreamsBootstrapApplication extends KafkaProducerApplication { +public class MyProducerApplication extends KafkaProducerApplication { public static void main(final String[] args) { - startApplication(new StreamsBootstrapApplication(), args); + startApplication(new MyProducerApplication(), args); } @Override diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 7f0c79b7..88dad30a 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -38,6 +38,7 @@ import net.mguenther.kafka.junit.SendKeyValues; import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; @@ -238,7 +239,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { @Override public SerdeConfig defaultSerializationConfig() { - throw new UnsupportedOperationException(); + return new SerdeConfig(StringSerde.class, StringSerde.class); } })) { kafkaCluster.start(); From 18effda85ace1326a4e749b368d659974532f0d4 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 23 Jul 2024 12:47:10 +0200 Subject: [PATCH 07/12] Update --- .../bakdata/kafka/KafkaPropertiesFactory.java | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index 51d07e5d..e52aaa0d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -38,14 +38,41 @@ class KafkaPropertiesFactory { private final @NonNull KafkaEndpointConfig endpointConfig; Map createKafkaProperties(final Map configOverrides) { - final Map kafkaConfig = new HashMap<>(this.baseConfig); - kafkaConfig.putAll(this.app.createKafkaProperties()); - final SerializationConfig serializationConfig = this.app.defaultSerializationConfig(); - kafkaConfig.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); - kafkaConfig.putAll(this.configuration.getKafkaConfig()); - kafkaConfig.putAll(this.endpointConfig.createKafkaProperties()); - kafkaConfig.putAll(serializationConfig.createProperties()); - kafkaConfig.putAll(configOverrides); - return Collections.unmodifiableMap(kafkaConfig); + return new Task().createKafkaProperties(configOverrides); + } + + private class Task { + private final Map kafkaConfig = new HashMap<>(); + + private Map createKafkaProperties(final Map configOverrides) { + this.putAll(KafkaPropertiesFactory.this.app.createKafkaProperties()); + final SerializationConfig serializationConfig = + KafkaPropertiesFactory.this.app.defaultSerializationConfig(); + this.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); + this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig()); + this.putAll(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); + this.putAll(serializationConfig.createProperties()); + this.putAllValidating(configOverrides); + return Collections.unmodifiableMap(this.kafkaConfig); + } + + private void putAllValidating(final Map configs) { + this.validateNotSet(configs); + this.putAll(configs); + } + + private void putAll(final Map configs) { + this.kafkaConfig.putAll(configs); + } + + private void validateNotSet(final Map configs) { + configs.keySet().forEach(this::validateNotSet); + } + + private void validateNotSet(final String key) { + if (this.kafkaConfig.containsKey(key)) { + throw new IllegalArgumentException(String.format("'%s' has been configured already", key)); + } + } } } From 1798b849fe94f2dc7f40d0c8456984cb80152d29 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 23 Jul 2024 12:48:11 +0200 Subject: [PATCH 08/12] Update --- .../src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index e52aaa0d..81b52430 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -45,6 +45,7 @@ private class Task { private final Map kafkaConfig = new HashMap<>(); private Map createKafkaProperties(final Map configOverrides) { + this.putAll(KafkaPropertiesFactory.this.baseConfig); this.putAll(KafkaPropertiesFactory.this.app.createKafkaProperties()); final SerializationConfig serializationConfig = KafkaPropertiesFactory.this.app.defaultSerializationConfig(); From c7468199ba37497c7946f685b98590f7b4ee9a93 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 23 Jul 2024 12:54:13 +0200 Subject: [PATCH 09/12] Update --- .../bakdata/kafka/KafkaPropertiesFactory.java | 4 +- .../kafka/ConfiguredStreamsAppTest.java | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index 81b52430..7a08aea7 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -52,7 +52,7 @@ private Map createKafkaProperties(final Map conf this.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig()); this.putAll(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); - this.putAll(serializationConfig.createProperties()); + this.putAllValidating(serializationConfig.createProperties()); this.putAllValidating(configOverrides); return Collections.unmodifiableMap(this.kafkaConfig); } @@ -72,7 +72,7 @@ private void validateNotSet(final Map configs) { private void validateNotSet(final String key) { if (this.kafkaConfig.containsKey(key)) { - throw new IllegalArgumentException(String.format("'%s' has been configured already", key)); + throw new IllegalArgumentException(String.format("'%s' should not be configured already", key)); } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 4f1c9882..8ccbd8bc 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -27,10 +27,13 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.common.serialization.Serdes.LongSerde; import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -88,6 +91,51 @@ void shouldSetDefaultSerde() { .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class); } + @Test + void shouldThrowIfKeySerdeHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + DEFAULT_KEY_SERDE_CLASS_CONFIG, ByteArraySerde.class + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.key.serde' should not be configured already"); + } + + @Test + void shouldThrowIfValueSerdeHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + DEFAULT_VALUE_SERDE_CLASS_CONFIG, ByteArraySerde.class + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.value.serde' should not be configured already"); + } + + @Test + void shouldThrowIfAppIdHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + StreamsConfig.APPLICATION_ID_CONFIG, "my-app" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'application.id' should not be configured already"); + } + private static class TestApplication implements StreamsApp { @Override From caa3bcbabdabfbc8d8bad944fa69f6991b56cefe Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 23 Jul 2024 12:54:49 +0200 Subject: [PATCH 10/12] Update --- .../main/java/com/bakdata/kafka/KafkaPropertiesFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index 7a08aea7..9973bbef 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -47,11 +47,11 @@ private class Task { private Map createKafkaProperties(final Map configOverrides) { this.putAll(KafkaPropertiesFactory.this.baseConfig); this.putAll(KafkaPropertiesFactory.this.app.createKafkaProperties()); - final SerializationConfig serializationConfig = - KafkaPropertiesFactory.this.app.defaultSerializationConfig(); this.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig()); this.putAll(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); + final SerializationConfig serializationConfig = + KafkaPropertiesFactory.this.app.defaultSerializationConfig(); this.putAllValidating(serializationConfig.createProperties()); this.putAllValidating(configOverrides); return Collections.unmodifiableMap(this.kafkaConfig); From c2b468b5fd4c57564d581c989a146fb3da011be5 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 24 Jul 2024 08:00:53 +0200 Subject: [PATCH 11/12] Update --- .../bakdata/kafka/KafkaPropertiesFactory.java | 2 +- .../kafka/ConfiguredProducerAppTest.java | 64 +++++++++++++++++++ .../kafka/ConfiguredStreamsAppTest.java | 31 +++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index 9973bbef..bdb88182 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -49,7 +49,7 @@ private Map createKafkaProperties(final Map conf this.putAll(KafkaPropertiesFactory.this.app.createKafkaProperties()); this.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig()); - this.putAll(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); + this.putAllValidating(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); final SerializationConfig serializationConfig = KafkaPropertiesFactory.this.app.defaultSerializationConfig(); this.putAllValidating(serializationConfig.createProperties()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 2f2c1613..71a988ef 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -27,8 +27,12 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Map; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; @@ -88,6 +92,66 @@ void shouldSetDefaultSerializer() { .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); } + @Test + void shouldThrowIfKeySerializerHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.key.serde' should not be configured already"); + } + + @Test + void shouldThrowIfValueSerializerHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.value.serde' should not be configured already"); + } + + @Test + void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka" + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'bootstrap.servers' should not be configured already"); + } + + @Test + void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "my-schema-registry" + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'schema.registry.url' should not be configured already"); + } + private static class TestProducer implements ProducerApp { @Override diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 8ccbd8bc..78825ba8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Map; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.common.serialization.Serdes.LongSerde; @@ -136,6 +137,36 @@ void shouldThrowIfAppIdHasBeenConfiguredDifferently() { .hasMessage("'application.id' should not be configured already"); } + @Test + void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'bootstrap.servers' should not be configured already"); + } + + @Test + void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "my-schema-registry" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'schema.registry.url' should not be configured already"); + } + private static class TestApplication implements StreamsApp { @Override From bd99479e9129dde679d689f278ce1f2eb389e2ac Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 24 Jul 2024 08:01:20 +0200 Subject: [PATCH 12/12] Update --- .../java/com/bakdata/kafka/ConfiguredProducerAppTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 71a988ef..6b495a79 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -104,7 +104,7 @@ void shouldThrowIfKeySerializerHasBeenConfiguredDifferently() { .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("'default.key.serde' should not be configured already"); + .hasMessage("'key.serializer' should not be configured already"); } @Test @@ -119,7 +119,7 @@ void shouldThrowIfValueSerializerHasBeenConfiguredDifferently() { .schemaRegistryUrl("fake") .build())) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("'default.value.serde' should not be configured already"); + .hasMessage("'value.serializer' should not be configured already"); } @Test