diff --git a/gradle.properties b/gradle.properties index 01aa124cc..ff3897924 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -projectVersion=1.2.1.BUILD-SNAPSHOT +projectVersion=2.0.0.BUILD-SNAPSHOT kafkaVersion=2.3.0 micronautDocsVersion=1.0.3 micronautVersion=1.1.4 diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/AbstractKafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/AbstractKafkaStreamsConfiguration.java index 0fe4fd7b0..f8e9875af 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/AbstractKafkaStreamsConfiguration.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/AbstractKafkaStreamsConfiguration.java @@ -33,10 +33,10 @@ * @param The key deserializer type * @param The value deserializer type */ -public class AbstractKafkaStreamsConfiguration extends AbstractKafkaConfiguration { +public abstract class AbstractKafkaStreamsConfiguration extends AbstractKafkaConfiguration implements KafkaStreamsConfiguration { /** - * Construct a new {@link KafkaStreamsConfiguration} for the given defaults. + * Construct a new {@link AbstractKafkaStreamsConfiguration} for the given defaults. * * @param defaultConfiguration The default configuration */ diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java index 845442a7e..ebab8f529 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/ConfiguredStreamBuilder.java @@ -35,8 +35,8 @@ public class ConfiguredStreamBuilder extends StreamsBuilder { * * @param configuration The configuration */ - public ConfiguredStreamBuilder(Properties configuration) { - this.configuration.putAll(configuration); + public ConfiguredStreamBuilder(KafkaStreamsConfiguration configuration) { + this.configuration.putAll(configuration.getConfig()); } /** diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java index 80cd815ea..49904bcdf 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/DefaultKafkaStreamsConfiguration.java @@ -23,7 +23,6 @@ import javax.inject.Named; import javax.inject.Singleton; -import java.util.Properties; /** * The default streams configuration is non other is present. @@ -33,14 +32,14 @@ * @param * @param */ -@Requires(missingProperty = KafkaStreamsConfiguration.PREFIX + ".default") +@Requires(missingProperty = NamedKafkaStreamsConfiguration.PREFIX + ".default") @Singleton @Requires(beans = KafkaDefaultConfiguration.class) @Named("default") @Primary public class DefaultKafkaStreamsConfiguration extends AbstractKafkaStreamsConfiguration { /** - * Construct a new {@link KafkaStreamsConfiguration} for the given defaults. + * Construct a new {@link DefaultKafkaStreamsConfiguration} for the given defaults. * * @param defaultConfiguration The default configuration * @param applicationConfiguration The application configuration @@ -50,8 +49,6 @@ public DefaultKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfigu ApplicationConfiguration applicationConfiguration, Environment environment) { super(defaultConfiguration); - Properties config = getConfig(); - config.putAll(defaultConfiguration.getConfig()); - init(applicationConfiguration, environment, config); + init(applicationConfiguration, environment, getConfig()); } } diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java index 1512106af..451f8ffc8 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsConfiguration.java @@ -1,65 +1,18 @@ -/* - * Copyright 2017-2019 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package io.micronaut.configuration.kafka.streams; -import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration; -import io.micronaut.context.annotation.EachProperty; -import io.micronaut.context.annotation.Parameter; -import io.micronaut.context.annotation.Requires; -import io.micronaut.context.env.Environment; -import io.micronaut.core.naming.NameUtils; -import io.micronaut.runtime.ApplicationConfiguration; - import java.util.Properties; -import static io.micronaut.configuration.kafka.streams.KafkaStreamsConfiguration.PREFIX; - /** - * The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}. + * Interface used in {@link ConfiguredStreamBuilder}. * - * @param The generic key type - * @param The generic value type + * @since 2.0 + * @author bobby */ -@EachProperty(value = PREFIX, primary = "default") -@Requires(beans = KafkaDefaultConfiguration.class) -public class KafkaStreamsConfiguration extends AbstractKafkaStreamsConfiguration { - - /** - * The default streams configuration. - */ - public static final String PREFIX = "kafka.streams"; - - +public interface KafkaStreamsConfiguration { /** - * Construct a new {@link KafkaStreamsConfiguration} for the given defaults. + * The configuration. * - * @param streamName The stream name - * @param defaultConfiguration The default configuration - * @param applicationConfiguration The application configuration - * @param environment The environment + * @return Properties for the streams configuration */ - public KafkaStreamsConfiguration( - @Parameter String streamName, - KafkaDefaultConfiguration defaultConfiguration, - ApplicationConfiguration applicationConfiguration, - Environment environment) { - super(defaultConfiguration); - Properties config = getConfig(); - String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true); - config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new)); - init(applicationConfiguration, environment, config); - } + Properties getConfig(); } diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java index 50e65fc6a..23fcc8f5e 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java @@ -19,7 +19,6 @@ import io.micronaut.context.annotation.EachBean; import io.micronaut.context.annotation.Factory; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.kstream.KStream; import javax.annotation.PreDestroy; import javax.inject.Singleton; @@ -28,7 +27,6 @@ import java.util.Collection; import java.util.concurrent.ConcurrentLinkedDeque; - /** * A factory that constructs the {@link KafkaStreams} bean. * @@ -40,30 +38,16 @@ public class KafkaStreamsFactory implements Closeable { private final Collection streams = new ConcurrentLinkedDeque<>(); - /** - * Exposes the {@link ConfiguredStreamBuilder} as a bean. - * - * @param configuration The configuration - * @return The streams builder - */ - @EachBean(AbstractKafkaStreamsConfiguration.class) - ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration configuration) { - return new ConfiguredStreamBuilder(configuration.getConfig()); - } - /** * Builds the default {@link KafkaStreams} bean from the configuration and the supplied {@link ConfiguredStreamBuilder}. * - * @param builder The builder - * @param kStreams The KStream definitions + * @param config The builder * @return The {@link KafkaStreams} bean */ - @EachBean(ConfiguredStreamBuilder.class) + @EachBean(KafkaStreamsConfiguration.class) @Context - KafkaStreams kafkaStreams( - ConfiguredStreamBuilder builder, - // required for initialization. DO NOT DELETE - KStream... kStreams) { + KafkaStreams kafkaStreams(KafkaStreamsConfiguration config) { + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); KafkaStreams kafkaStreams = new KafkaStreams( builder.build(), builder.getConfiguration() @@ -94,5 +78,4 @@ public void close() { } } } - } diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/NamedKafkaStreamsConfiguration.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/NamedKafkaStreamsConfiguration.java new file mode 100644 index 000000000..d5957cfa2 --- /dev/null +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/NamedKafkaStreamsConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2017-2019 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka.streams; + +import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration; +import io.micronaut.context.annotation.EachProperty; +import io.micronaut.context.annotation.Parameter; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.env.Environment; +import io.micronaut.core.naming.NameUtils; +import io.micronaut.runtime.ApplicationConfiguration; + +import java.util.Properties; + +import static io.micronaut.configuration.kafka.streams.NamedKafkaStreamsConfiguration.PREFIX; + +/** + * The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}. + * + * @param The generic key type + * @param The generic value type + */ +@EachProperty(value = PREFIX, primary = "default") +@Requires(beans = KafkaDefaultConfiguration.class) +public class NamedKafkaStreamsConfiguration extends AbstractKafkaStreamsConfiguration { + + /** + * The default streams configuration. + */ + public static final String PREFIX = "kafka.streams"; + + + /** + * Construct a new {@link NamedKafkaStreamsConfiguration} for the given defaults. + * + * @param streamName The stream name + * @param defaultConfiguration The default configuration + * @param applicationConfiguration The application configuration + * @param environment The environment + */ + public NamedKafkaStreamsConfiguration( + @Parameter String streamName, + KafkaDefaultConfiguration defaultConfiguration, + ApplicationConfiguration applicationConfiguration, + Environment environment) { + super(defaultConfiguration); + Properties config = getConfig(); + String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true); + config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new)); + init(applicationConfiguration, environment, config); + } +} diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy index 09e36802d..1be528fff 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsSpec.groovy @@ -25,6 +25,10 @@ import spock.lang.Shared import spock.lang.Specification import spock.util.concurrent.PollingConditions +import javax.inject.Qualifier +import javax.naming.Name +import javax.validation.groups.Default + class KafkaStreamsSpec extends Specification { @Shared @@ -47,7 +51,8 @@ class KafkaStreamsSpec extends Specification { void "test config"() { when: - def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream')) + def config = context.getBean(NamedKafkaStreamsConfiguration, Qualifiers.byName("my-stream")) + def builder = new ConfiguredStreamBuilder(config) then: builder.configuration['application.id'] == "my-stream" @@ -56,7 +61,7 @@ class KafkaStreamsSpec extends Specification { void "test config from stream"() { when: - def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream')) + def stream = context.getBean(KafkaStreams, Qualifiers.byName("my-stream")) then: stream.config.originals().get('application.id') == "my-stream" diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java index 638ea273d..cfd2cc688 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/WordCountStream.java @@ -18,6 +18,7 @@ // tag::imports[] import io.micronaut.context.annotation.Factory; +import kafka.Kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; @@ -48,8 +49,9 @@ public class WordCountStream { // tag::wordCountStream[] @Singleton @Named(STREAM_WORD_COUNT) - KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + KStream wordCountStream(NamedKafkaStreamsConfiguration config) { // <3> // set default serdes + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -82,10 +84,11 @@ KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3 @Singleton @Named(MY_STREAM) KStream myStream( - @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + @Named(MY_STREAM) NamedKafkaStreamsConfiguration config) { // end::namedStream[] // set default serdes + ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config); Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());