diff --git a/README.md b/README.md index 6dd05694..ea027e22 100644 --- a/README.md +++ b/README.md @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil ```java import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; -public class StreamsBootstrapApplication extends KafkaStreamsApplication { +public class MyStreamsApplication extends KafkaStreamsApplication { public static void main(final String[] args) { - startApplication(new StreamsBootstrapApplication(), args); + startApplication(new MyStreamsApplication(), args); } @Override @@ -86,6 +88,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication { return "streams-bootstrap-app-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + // Optionally you can define custom Kafka properties @Override public Map createKafkaProperties() { @@ -142,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import java.util.Map; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; -public class StreamsBootstrapApplication extends KafkaProducerApplication { +public class MyProducerApplication extends KafkaProducerApplication { public static void main(final String[] args) { - startApplication(new StreamsBootstrapApplication(), args); + startApplication(new MyProducerApplication(), args); } @Override @@ -162,6 +171,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication { }; } + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + // Optionally you can define custom Kafka properties @Override public Map createKafkaProperties() { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index cdede819..88dad30a 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -38,6 +38,7 @@ import net.mguenther.kafka.junit.SendKeyValues; import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; @@ -63,6 +64,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -91,6 +97,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }), new String[]{ "--brokers", "localhost:9092", "--schema-registry-url", "http://localhost:8081", @@ -115,6 +126,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -147,6 +163,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } @@ -179,6 +200,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "app"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } })) { kafkaCluster.start(); kafkaCluster.createTopic(TopicConfig.withName(input).build()); @@ -210,6 +236,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "app"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } })) { kafkaCluster.start(); kafkaCluster.createTopic(TopicConfig.withName(input).build()); @@ -249,6 +280,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } }, new String[]{ @@ -275,6 +311,11 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { throw new UnsupportedOperationException(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + throw new UnsupportedOperationException(); + } }; } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java index 6e928d43..640328eb 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java @@ -27,6 +27,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -57,6 +58,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return CloseFlagApp.this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + @Override public void close() { CloseFlagApp.this.appClosed = true; diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index d460658b..67232632 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -32,11 +32,13 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.Map; import java.util.concurrent.TimeUnit; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; @@ -44,7 +46,6 @@ import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -85,10 +86,8 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } })) { app.setBrokers(this.kafkaCluster.getBrokerList()); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 212c7611..54aee24c 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index 2bcdc095..58f6d7af 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -31,6 +32,7 @@ import java.util.regex.Pattern; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index ce3f20eb..27675109 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -11,7 +11,7 @@ dependencies { api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion) api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion) val confluentVersion: String by project - implementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) + implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) api( group = "org.slf4j", @@ -42,6 +42,7 @@ dependencies { testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java index 65ed48bb..d83fb757 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/App.java @@ -33,7 +33,6 @@ * @param type of topic config * @param type of clean up config */ -@FunctionalInterface public interface App extends AutoCloseable { /** @@ -63,4 +62,10 @@ default Map createKafkaProperties() { default void setup(final EffectiveAppConfiguration configuration) { // do nothing by default } + + /** + * Configure default serialization behavior + * @return {@code SerializationConfig} + */ + SerializationConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index 59983d2c..bf806ecb 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -26,14 +26,12 @@ import static java.util.Collections.emptyMap; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.HashMap; import java.util.Map; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; /** * A {@link ProducerApp} with a corresponding {@link AppConfiguration} @@ -45,17 +43,9 @@ public class ConfiguredProducerApp implements ConfiguredA private final @NonNull T app; private final @NonNull AppConfiguration configuration; - private static Map createBaseConfig(final KafkaEndpointConfig endpointConfig) { + private static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); - if (endpointConfig.isSchemaRegistryConfigured()) { - kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); - kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); - } else { - kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - } - kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all"); @@ -70,12 +60,6 @@ private static Map createBaseConfig(final KafkaEndpointConfig en * Configuration is created in the following order *
    *
  • - * {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and - * {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} are configured based on - * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. - * If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise - * {@link StringSerializer} is used. - * Additionally, the following is configured: *
          * max.in.flight.requests.per.connection=1
          * acks=all
    @@ -95,6 +79,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en
          *     
  • * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} *
  • + *
  • + * {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and + * {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} is configured using + * {@link ProducerApp#defaultSerializationConfig()} + *
  • *
* * @param endpointConfig endpoint to run app on @@ -130,7 +119,7 @@ public void close() { } private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) { - final Map baseConfig = createBaseConfig(endpointConfig); + final Map baseConfig = createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 86bb3c2e..0724652e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -32,7 +31,6 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -46,17 +44,9 @@ public class ConfiguredStreamsApp implements ConfiguredApp private final @NonNull T app; private final @NonNull AppConfiguration configuration; - private static Map createBaseConfig(final KafkaEndpointConfig endpointConfig) { + private static Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); - if (endpointConfig.isSchemaRegistryConfigured()) { - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - } else { - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); - } - // exactly once and order kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); @@ -74,12 +64,7 @@ private static Map createBaseConfig(final KafkaEndpointConfig en * Configuration is created in the following order *
    *
  • - * {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG} and - * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG} are configured based on - * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. - * If Schema Registry is configured, {@link SpecificAvroSerde} is used, otherwise {@link StringSerde} is - * used. - * Additionally, exactly-once, in-order, and compression are configured: + * Exactly-once, in-order, and compression are configured: *
          * processing.guarantee=exactly_once_v2
          * producer.max.in.flight.requests.per.connection=1
    @@ -101,6 +86,11 @@ private static Map createBaseConfig(final KafkaEndpointConfig en
          *         Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
          *     
  • *
  • + * {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG} and + * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG} is configured using + * {@link StreamsApp#defaultSerializationConfig()} + *
  • + *
  • * {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using * {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)} *
  • @@ -169,7 +159,7 @@ public void close() { } private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) { - final Map baseConfig = createBaseConfig(endpointConfig); + final Map baseConfig = createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java index 5aa42876..b9552ac4 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaEndpointConfig.java @@ -52,17 +52,10 @@ public class KafkaEndpointConfig { public Map createKafkaProperties() { final Map kafkaConfig = new HashMap<>(); kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokers); - if (this.isSchemaRegistryConfigured()) { + if (this.schemaRegistryUrl != null) { kafkaConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryUrl); } return Collections.unmodifiableMap(kafkaConfig); } - /** - * Check if schema registry has been configured - * @return true if {@link #schemaRegistryUrl} has been configured - */ - public boolean isSchemaRegistryConfigured() { - return this.schemaRegistryUrl != null; - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index 02cf5c41..1c38c5a8 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -38,12 +38,42 @@ class KafkaPropertiesFactory { private final @NonNull KafkaEndpointConfig endpointConfig; Map createKafkaProperties(final Map configOverrides) { - final Map kafkaConfig = new HashMap<>(this.baseConfig); - kafkaConfig.putAll(this.app.createKafkaProperties()); - kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv())); - kafkaConfig.putAll(this.configuration.getKafkaConfig()); - kafkaConfig.putAll(this.endpointConfig.createKafkaProperties()); - kafkaConfig.putAll(configOverrides); - return Collections.unmodifiableMap(kafkaConfig); + return new Task().createKafkaProperties(configOverrides); + } + + private class Task { + private final Map kafkaConfig = new HashMap<>(); + + private Map createKafkaProperties(final Map configOverrides) { + this.putAll(KafkaPropertiesFactory.this.baseConfig); + this.putAll(KafkaPropertiesFactory.this.app.createKafkaProperties()); + this.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv())); + this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig()); + this.putAllValidating(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties()); + final SerializationConfig serializationConfig = + KafkaPropertiesFactory.this.app.defaultSerializationConfig(); + this.putAllValidating(serializationConfig.createProperties()); + this.putAllValidating(configOverrides); + return Collections.unmodifiableMap(this.kafkaConfig); + } + + private void putAllValidating(final Map configs) { + this.validateNotSet(configs); + this.putAll(configs); + } + + private void putAll(final Map configs) { + this.kafkaConfig.putAll(configs); + } + + private void validateNotSet(final Map configs) { + configs.keySet().forEach(this::validateNotSet); + } + + private void validateNotSet(final String key) { + if (this.kafkaConfig.containsKey(key)) { + throw new IllegalArgumentException(String.format("'%s' should not be configured already", key)); + } + } } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java index 42cee6e6..e9113274 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -27,7 +27,6 @@ /** * Application that defines how to produce messages to Kafka and necessary configurations */ -@FunctionalInterface public interface ProducerApp extends App { /** @@ -46,4 +45,7 @@ default ProducerCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { return new ProducerCleanUpConfiguration(); } + + @Override + SerializerConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java new file mode 100644 index 00000000..70e947d1 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerdeConfig.java @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsConfig; + +/** + * Defines how to (de-)serialize the data in a Kafka Streams app + */ +@RequiredArgsConstructor +@With +public class SerdeConfig implements SerializationConfig { + + private final @NonNull Class keySerde; + private final @NonNull Class valueSerde; + + @Override + public Map createProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, this.keySerde, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, this.valueSerde + ); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java new file mode 100644 index 00000000..0c63b693 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializationConfig.java @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Map; + +/** + * Defines how to (de-)serialize the data in a Kafka client + */ +@FunctionalInterface +public interface SerializationConfig { + + /** + * Create properties from this {@code SerializationConfig} + * @return Map of serialization configurations + */ + Map createProperties(); +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java new file mode 100644 index 00000000..13386621 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/SerializerConfig.java @@ -0,0 +1,51 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.With; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Defines how to serialize the data in a Kafka producer + */ +@RequiredArgsConstructor +@With +public class SerializerConfig implements SerializationConfig { + + private final @NonNull Class keySerializer; + private final @NonNull Class valueSerializer; + + @Override + public Map createProperties() { + return Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, this.keySerializer, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer + ); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java index 0802720a..9adde556 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -55,4 +55,7 @@ default StreamsCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { return new StreamsCleanUpConfiguration(); } + + @Override + SerdeConfig defaultSerializationConfig(); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 57dbccb3..225ee0f2 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -27,9 +27,13 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Map; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -76,7 +80,7 @@ void shouldPrioritizeEnvironmentConfigs() { } @Test - void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { + void shouldSetDefaultSerializer() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); @@ -84,20 +88,68 @@ void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { .brokers("fake") .schemaRegistryUrl("fake") .build())) - .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); + .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); } @Test - void shouldSetDefaultStringSerializerWhenSchemaRegistryUrlIsNotSet() { - final AppConfiguration configuration = newAppConfiguration(); + void shouldThrowIfKeySerializerHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class + )); final ConfiguredProducerApp configuredApp = new ConfiguredProducerApp<>(new TestProducer(), configuration); - assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") + .schemaRegistryUrl("fake") .build())) - .containsEntry(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .containsEntry(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'key.serializer' should not be configured already"); + } + + @Test + void shouldThrowIfValueSerializerHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'value.serializer' should not be configured already"); + } + + @Test + void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka" + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'bootstrap.servers' should not be configured already"); + } + + @Test + void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "my-schema-registry" + )); + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp<>(new TestProducer(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'schema.registry.url' should not be configured already"); } private static class TestProducer implements ProducerApp { @@ -114,5 +166,10 @@ public Map createKafkaProperties() { "hello", "world" ); } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, LongSerializer.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index b1b3c648..46f020e4 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -24,14 +24,17 @@ package com.bakdata.kafka; -import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; -import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; +import org.apache.kafka.common.serialization.Serdes.LongSerde; import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; @@ -77,7 +80,7 @@ void shouldPrioritizeEnvironmentConfigs() { } @Test - void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { + void shouldSetDefaultSerde() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); @@ -85,21 +88,83 @@ void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { .brokers("fake") .schemaRegistryUrl("fake") .build())) - .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class) - .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class) - .containsEntry(SCHEMA_REGISTRY_URL_CONFIG, "fake"); + .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class) + .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class); } @Test - void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() { - final AppConfiguration configuration = newAppConfiguration(); + void shouldThrowIfKeySerdeHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + DEFAULT_KEY_SERDE_CLASS_CONFIG, ByteArraySerde.class + )); final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(new TestApplication(), configuration); - assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") + .schemaRegistryUrl("fake") .build())) - .containsEntry(DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class) - .containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.key.serde' should not be configured already"); + } + + @Test + void shouldThrowIfValueSerdeHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + DEFAULT_VALUE_SERDE_CLASS_CONFIG, ByteArraySerde.class + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'default.value.serde' should not be configured already"); + } + + @Test + void shouldThrowIfAppIdHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + StreamsConfig.APPLICATION_ID_CONFIG, "my-app" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'application.id' should not be configured already"); + } + + @Test + void shouldThrowIfBootstrapServersHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'bootstrap.servers' should not be configured already"); + } + + @Test + void shouldThrowIfSchemaRegistryHasBeenConfiguredDifferently() { + final AppConfiguration configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of( + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "my-schema-registry" + )); + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), configuration); + assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() + .brokers("fake") + .schemaRegistryUrl("fake") + .build())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'schema.registry.url' should not be configured already"); } private static class TestApplication implements StreamsApp { @@ -121,5 +186,10 @@ public Map createKafkaProperties() { "hello", "world" ); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, LongSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java index 03fbe575..61108d65 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -114,5 +115,10 @@ public ProducerCleanUpConfiguration setupCleanUp( public ProducerRunnable buildRunnable(final ProducerBuilder builder) { return () -> {}; } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java index c1ff3fbf..0fc4e58c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -124,5 +125,10 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return "foo"; } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 555290e1..a65e729c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -31,6 +31,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsExecutionOptions; import com.bakdata.kafka.StreamsRunner; @@ -49,6 +50,7 @@ import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; @@ -92,8 +94,8 @@ static Thread run(final StreamsRunner runner) { static ConfiguredStreamsApp configureApp(final StreamsApp app, final StreamsTopicConfig topics) { final AppConfiguration configuration = new AppConfiguration<>(topics, Map.of( - StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); return new ConfiguredStreamsApp<>(app, configuration); } @@ -250,5 +252,10 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java index 5848accc..739912f9 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroKeyProducer.java @@ -27,10 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; -import java.util.Map; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -46,9 +46,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(SpecificAvroSerializer.class, StringSerializer.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java index 64356ce2..1b373284 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/AvroValueProducer.java @@ -27,10 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.TestRecord; -import java.util.Map; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -46,9 +46,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } @Override - public Map createKafkaProperties() { - return Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class - ); + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java index d321e3ca..4f5b2680 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java @@ -24,17 +24,16 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.time.Duration; -import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -71,10 +70,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, SpecificAvroSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java index 88c7d0e3..41e192c1 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -42,4 +44,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 212c7611..54aee24c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -24,10 +24,12 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; import lombok.NoArgsConstructor; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java index 67056d93..6f303f2e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java @@ -24,15 +24,14 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -49,10 +48,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(SpecificAvroSerde.class, StringSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java index a2ee1aa8..fb8087d8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java @@ -24,15 +24,14 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor @@ -49,10 +48,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, SpecificAvroSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java index 846e851f..7406b484 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java @@ -25,16 +25,15 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Map; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; @@ -66,10 +65,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties() { - return Map.of( - StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, - StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class - ); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java index 10de4059..296bf8d0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/StringProducer.java @@ -27,8 +27,10 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; +import com.bakdata.kafka.SerializerConfig; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; public class StringProducer implements ProducerApp { @Override @@ -39,4 +41,9 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { } }; } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index 2bcdc095..58f6d7af 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -31,6 +32,7 @@ import java.util.regex.Pattern; import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java index 9c86b0b8..16b8f739 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.test_applications; +import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; @@ -32,6 +33,7 @@ import lombok.NoArgsConstructor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -58,4 +60,9 @@ public void buildTopology(final TopologyBuilder builder) { public String getUniqueAppId(final StreamsTopicConfig topics) { return this.getClass().getSimpleName() + "-" + topics.getOutputTopic(); } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } }