diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java index ef628b265..23fcc8f5e 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java @@ -41,12 +41,13 @@ public class KafkaStreamsFactory implements Closeable { /** * Builds the default {@link KafkaStreams} bean from the configuration and the supplied {@link ConfiguredStreamBuilder}. * - * @param builder The builder + * @param config The builder * @return The {@link KafkaStreams} bean */ - @EachBean(ConfiguredStreamBuilder.class) + @EachBean(KafkaStreamsConfiguration.class) @Context - KafkaStreams kafkaStreams(ConfiguredStreamBuilder builder) { + KafkaStreams kafkaStreams(KafkaStreamsConfiguration config) { + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); KafkaStreams kafkaStreams = new KafkaStreams( builder.build(), builder.getConfiguration() diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy index 09e36802d..1be528fff 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy @@ -25,6 +25,10 @@ import spock.lang.Shared import spock.lang.Specification import spock.util.concurrent.PollingConditions +import javax.inject.Qualifier +import javax.naming.Name +import javax.validation.groups.Default + class KafkaStreamsSpec extends Specification { @Shared @@ -47,7 +51,8 @@ class KafkaStreamsSpec extends Specification { void "test config"() { when: - def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream')) + def config = context.getBean(NamedKafkaStreamsConfiguration, Qualifiers.byName("my-stream")) + def builder = new ConfiguredStreamBuilder(config) then: builder.configuration['application.id'] == "my-stream" @@ -56,7 +61,7 @@ class KafkaStreamsSpec extends Specification { void "test config from stream"() { when: - def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream')) + def stream = context.getBean(KafkaStreams, Qualifiers.byName("my-stream")) then: stream.config.originals().get('application.id') == "my-stream" diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java index 638ea273d..cfd2cc688 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java @@ -18,6 +18,7 @@ // tag::imports[] import io.micronaut.context.annotation.Factory; +import kafka.Kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; @@ -48,8 +49,9 @@ public class WordCountStream { // tag::wordCountStream[] @Singleton @Named(STREAM_WORD_COUNT) - KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + KStream wordCountStream(NamedKafkaStreamsConfiguration config) { // <3> // set default serdes + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -82,10 +84,11 @@ KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3 @Singleton @Named(MY_STREAM) KStream myStream( - @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + @Named(MY_STREAM) NamedKafkaStreamsConfiguration config) { // end::namedStream[] // set default serdes + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());