diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index f3541280..803b2c45 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -75,10 +75,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values.streams "productive" }} - - name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE" - value: {{ .Values.streams.productive | quote }} - {{- end }} {{- if hasKey .Values "debug" }} - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" value: {{ .Values.debug | quote }} diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index 9fa76774..ffbfd0e3 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -33,7 +33,6 @@ streams: extraOutputTopics: {} # role: output # errorTopic: error -# productive: true deleteOutput: false commandLine: {} diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index a662b0d0..c8240ccd 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -138,10 +138,6 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL" value: {{ .Values.streams.schemaRegistryUrl | quote }} {{- end }} - {{- if hasKey .Values.streams "productive" }} - - name: "{{ .Values.configurationEnvPrefix }}_PRODUCTIVE" - value: {{ .Values.streams.productive | quote }} - {{- end }} {{- if hasKey .Values "debug" }} - name: "{{ .Values.configurationEnvPrefix }}_DEBUG" value: {{ .Values.debug | quote }} diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index 449cffe3..699d1a84 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -44,7 +44,6 @@ streams: extraOutputTopics: {} # role: output # errorTopic: error - # productive: true commandLine: {} # MY_CLI_PARAM: "foo-bar" diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 6f369b81..bbfffe59 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -53,7 +53,6 @@ *
  • {@link #errorTopic}
  • *
  • {@link #extraInputTopics}
  • *
  • {@link #extraInputPatterns}
  • - *
  • {@link #productive}
  • *
  • {@link #volatileGroupInstanceId}
  • * * To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling @@ -77,10 +76,6 @@ public abstract class KafkaStreamsApplication extends KafkaApplication { private Map> extraInputTopics = new HashMap<>(); @CommandLine.Option(names = "--extra-input-patterns", split = ",", description = "Additional named input patterns") private Map extraInputPatterns = new HashMap<>(); - @CommandLine.Option(names = "--productive", arity = "0..1", - description = "Whether to use Kafka Streams configuration values, such as replication.factor=3, that are " - + "more suitable for production environments") - private boolean productive; @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; @@ -212,11 +207,9 @@ private ConfiguredStreamsApp createConfiguredApp(final boolean clean private StreamsAppConfiguration createConfiguration() { final StreamsTopicConfig topics = this.createTopicConfig(); final Map kafkaConfig = this.getKafkaConfig(); - final StreamsConfigurationOptions streamsOptions = this.createStreamsOptions(); return StreamsAppConfiguration.builder() .topics(topics) .kafkaConfig(kafkaConfig) - .options(streamsOptions) .build(); } @@ -238,12 +231,6 @@ private ExecutableStreamsApp createExecutableApp(final boolean clean return configuredStreamsApp.withEndpoint(endpointConfig); } - private StreamsConfigurationOptions createStreamsOptions() { - return StreamsConfigurationOptions.builder() - .productive(this.productive) - .build(); - } - @RequiredArgsConstructor private static class RunningApp implements AutoCloseable { private final @NonNull ExecutableStreamsApp app; diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 1abc5f6b..94690a4f 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -165,7 +165,6 @@ private KafkaStreamsApplication createWordCountApplication() { final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new); application.setOutputTopic("word_output"); application.setBrokers(this.kafkaCluster.getBrokerList()); - application.setProductive(false); application.setKafkaConfig(Map.of( StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index 01260779..dde4c8ba 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -55,6 +55,12 @@ private static Map createKafkaProperties(final KafkaEndpointConf kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); } + kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + + // compression + kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + return kafkaConfig; } @@ -68,20 +74,26 @@ private static Map createKafkaProperties(final KafkaEndpointConf * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. * If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise * {@link StringSerializer} is used. - * - *
  • - * Configs provided by {@link ProducerApp#createKafkaProperties()} - *
  • - *
  • - * Configs provided via environment variables (see - * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) - *
  • - *
  • - * Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()} - *
  • - *
  • - * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} - *
  • + * Additionally, the following is configured: + *
    +     * max.in.flight.requests.per.connection=1
    +     * acks=all
    +     * compression.type=gzip
    +     * 
    + * + *
  • + * Configs provided by {@link ProducerApp#createKafkaProperties()} + *
  • + *
  • + * Configs provided via environment variables (see + * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) + *
  • + *
  • + * Configs provided by {@link ProducerAppConfiguration#getKafkaConfig()} + *
  • + *
  • + * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} + *
  • * * * @return Kafka configuration diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 491267da..917fc6c6 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -31,6 +31,7 @@ import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -56,6 +57,15 @@ private static Map createKafkaProperties(final KafkaEndpointConf kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); } + // exactly once and order + kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); + + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); + + // compression + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); + return kafkaConfig; } @@ -69,31 +79,38 @@ private static Map createKafkaProperties(final KafkaEndpointConf * {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}. * If Schema Registry is configured, {@link SpecificAvroSerde} is used, otherwise {@link StringSerde} is * used. - * - *
  • - * Configs provided by {@link StreamsApp#createKafkaProperties(StreamsConfigurationOptions)} - *
  • - *
  • - * Configs provided via environment variables (see - * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) - *
  • - *
  • - * Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()} - *
  • - *
  • - * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} - *
  • - *
  • - * {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using - * {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)} - *
  • + * Additionally, exactly-once, in-order, and compression are configured: + *
    +     * processing.guarantee=exactly_once_v2
    +     * producer.max.in.flight.requests.per.connection=1
    +     * producer.acks=all
    +     * producer.compression.type=gzip
    +     * 
    + * + *
  • + * Configs provided by {@link StreamsApp#createKafkaProperties()} + *
  • + *
  • + * Configs provided via environment variables (see + * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) + *
  • + *
  • + * Configs provided by {@link StreamsAppConfiguration#getKafkaConfig()} + *
  • + *
  • + * Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()} + *
  • + *
  • + * {@link StreamsConfig#APPLICATION_ID_CONFIG} is configured using + * {@link StreamsApp#getUniqueAppId(StreamsTopicConfig)} + *
  • * * * @return Kafka configuration */ public Map getKafkaProperties(final KafkaEndpointConfig endpointConfig) { final Map kafkaConfig = createKafkaProperties(endpointConfig); - kafkaConfig.putAll(this.app.createKafkaProperties(this.configuration.getOptions())); + kafkaConfig.putAll(this.app.createKafkaProperties()); kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv())); kafkaConfig.putAll(this.configuration.getKafkaConfig()); kafkaConfig.putAll(endpointConfig.createKafkaProperties()); diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index 4cd3a837..b527caee 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -24,9 +24,9 @@ package com.bakdata.kafka; -import java.util.HashMap; +import static java.util.Collections.emptyMap; + import java.util.Map; -import org.apache.kafka.clients.producer.ProducerConfig; /** * Application that defines how to produce messages to Kafka and necessary configurations @@ -49,38 +49,11 @@ default void setup(final ProducerAppSetupConfiguration configuration) { void run(ProducerBuilder builder); /** - *

    This method should give a default configuration to run your producer application with.

    - * To add a custom configuration, add a similar method to your custom application class: - *
    {@code
    -     *   public Map createKafkaProperties() {
    -     *       # Try to always use the kafka properties from the super class as base Map
    -     *       Map kafkaConfig = ProducerApp.super.createKafkaProperties();
    -     *       kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
    -     *       kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class);
    -     *       return kafkaConfig;
    -     *   }
    -     * }
    - * - * Default configuration configures exactly-once, in-order, and compression: - *
    -     * max.in.flight.requests.per.connection=1
    -     * acks=all
    -     * compression.type=gzip
    -     * 
    - * - * @return Returns a default Kafka configuration + * This method should give a default configuration to run your producer application with. + * @return Returns a default Kafka configuration. Empty by default */ default Map createKafkaProperties() { - final Map kafkaConfig = new HashMap<>(); - - // exactly once and order - kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); - kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - - // compression - kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - - return kafkaConfig; + return emptyMap(); } /** diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index f555bd5c..01f20552 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -24,16 +24,14 @@ package com.bakdata.kafka; -import java.util.HashMap; +import static java.util.Collections.emptyMap; + import java.util.Map; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.streams.StreamsConfig; /** * Application that defines a Kafka Streams {@link org.apache.kafka.streams.Topology} and necessary configurations */ public interface StreamsApp extends AutoCloseable { - int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3; /** * Setup Kafka resources, such as topics, before running this app @@ -61,52 +59,11 @@ default void setup(final StreamsAppSetupConfiguration configuration) { String getUniqueAppId(StreamsTopicConfig topics); /** - *

    This method should give a default configuration to run your streaming application with.

    - * To add a custom configuration, add a similar method to your custom application class: - *
    {@code
    -     *   protected Map createKafkaProperties(StreamsOptions options) {
    -     *       # Try to always use the kafka properties from the super class as base Map
    -     *       Map kafkaConfig = StreamsApp.super.createKafkaProperties(options);
    -     *       kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    -     *       kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    -     *       return kafkaConfig;
    -     *   }
    -     * }
    - * - * Default configuration configures exactly-once, in-order, and compression: - *
    -     * processing.guarantee=exactly_once_v2
    -     * producer.max.in.flight.requests.per.connection=1
    -     * producer.acks=all
    -     * producer.compression.type=gzip
    -     * 
    - * - * If {@link StreamsConfigurationOptions#isProductive()} is set the following is configured additionally: - *
    -     * replication.factor=3
    -     * 
    - * - * @param options options to dynamically configure - * @return Returns a default Kafka Streams configuration + * This method should give a default configuration to run your streaming application with. + * @return Returns a default Kafka Streams configuration. Empty by default */ - default Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map kafkaConfig = new HashMap<>(); - - // exactly once and order - kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); - - // resilience - if (options.isProductive()) { - kafkaConfig.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, DEFAULT_PRODUCTIVE_REPLICATION_FACTOR); - } - - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); - - // compression - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); - - return kafkaConfig; + default Map createKafkaProperties() { + return emptyMap(); } /** diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java index a6862c8c..b20f71c1 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfiguration.java @@ -38,6 +38,4 @@ public class StreamsAppConfiguration { @NonNull StreamsTopicConfig topics = StreamsTopicConfig.builder().build(); @Builder.Default @NonNull Map kafkaConfig = emptyMap(); - @Builder.Default - @NonNull StreamsConfigurationOptions options = StreamsConfigurationOptions.builder().build(); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsConfigurationOptions.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsConfigurationOptions.java deleted file mode 100644 index 9c32bc6c..00000000 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsConfigurationOptions.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import lombok.Builder; -import lombok.Value; - -/** - * Options to configure {@link StreamsApp#createKafkaProperties(StreamsConfigurationOptions)} - */ -@Builder -@Value -public class StreamsConfigurationOptions { - - /** - * Defines if {@code StreamsApp} should be configured for a productive environment and thus requires resilience - */ - boolean productive; -} diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 82c6a8c9..97239e55 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -92,11 +92,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map properties = StreamsApp.super.createKafkaProperties(options); - properties.put("foo", "bar"); - properties.put("hello", "world"); - return properties; + public Map createKafkaProperties() { + return Map.of( + "foo", "bar", + "hello", "world" + ); } } } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java index 97acdde5..d321e3ca 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsConfigurationOptions; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -72,10 +71,10 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map kafkaConfig = StreamsApp.super.createKafkaProperties(options); - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - return kafkaConfig; + public Map createKafkaProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class + ); } } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java index 1e9a36e3..67056d93 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsConfigurationOptions; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -50,10 +49,10 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map kafkaConfig = StreamsApp.super.createKafkaProperties(options); - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); - return kafkaConfig; + public Map createKafkaProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class + ); } } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java index 26959f0f..a2ee1aa8 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsConfigurationOptions; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -50,10 +49,10 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map kafkaConfig = StreamsApp.super.createKafkaProperties(options); - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - return kafkaConfig; + public Map createKafkaProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class + ); } } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java index 82606447..e947ab6f 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java @@ -25,7 +25,6 @@ package com.bakdata.kafka.test_applications; import com.bakdata.kafka.StreamsApp; -import com.bakdata.kafka.StreamsConfigurationOptions; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.TopologyBuilder; @@ -41,6 +40,8 @@ @NoArgsConstructor public class MirrorWithNonDefaultSerde implements StreamsApp { + + boolean foo; @Override public void buildTopology(final TopologyBuilder builder) { final Serde valueSerde = this.getValueSerde(builder.getKafkaProperties()); @@ -61,10 +62,10 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { } @Override - public Map createKafkaProperties(final StreamsConfigurationOptions options) { - final Map kafkaConfig = StreamsApp.super.createKafkaProperties(options); - kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class); - return kafkaConfig; + public Map createKafkaProperties() { + return Map.of( + StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class, + StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class + ); } }