Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike on a new API for Kafka Streams support #34

Open
wants to merge 4 commits into
base: 5.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
projectVersion=1.2.1.BUILD-SNAPSHOT
projectVersion=2.0.0.BUILD-SNAPSHOT
kafkaVersion=2.3.0
micronautDocsVersion=1.0.3
micronautVersion=1.1.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
* @param <K> The key deserializer type
* @param <V> The value deserializer type
*/
public class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> {
public abstract class AbstractKafkaStreamsConfiguration<K, V> extends AbstractKafkaConfiguration<K, V> implements KafkaStreamsConfiguration {

/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
* Construct a new {@link AbstractKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class ConfiguredStreamBuilder extends StreamsBuilder {
*
* @param configuration The configuration
*/
public ConfiguredStreamBuilder(Properties configuration) {
this.configuration.putAll(configuration);
public ConfiguredStreamBuilder(KafkaStreamsConfiguration configuration) {
this.configuration.putAll(configuration.getConfig());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Properties;

/**
* The default streams configuration is non other is present.
Expand All @@ -33,14 +32,14 @@
* @param <K>
* @param <V>
*/
@Requires(missingProperty = KafkaStreamsConfiguration.PREFIX + ".default")
@Requires(missingProperty = NamedKafkaStreamsConfiguration.PREFIX + ".default")
@Singleton
@Requires(beans = KafkaDefaultConfiguration.class)
@Named("default")
@Primary
public class DefaultKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {
/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
* Construct a new {@link DefaultKafkaStreamsConfiguration} for the given defaults.
*
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
Expand All @@ -50,8 +49,6 @@ public DefaultKafkaStreamsConfiguration(KafkaDefaultConfiguration defaultConfigu
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
config.putAll(defaultConfiguration.getConfig());
init(applicationConfiguration, environment, config);
init(applicationConfiguration, environment, getConfig());
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,18 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.runtime.ApplicationConfiguration;

import java.util.Properties;

import static io.micronaut.configuration.kafka.streams.KafkaStreamsConfiguration.PREFIX;

/**
* The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}.
* Interface used in {@link ConfiguredStreamBuilder}.
*
* @param <K> The generic key type
* @param <V> The generic value type
* @since 2.0
* @author bobby
*/
@EachProperty(value = PREFIX, primary = "default")
@Requires(beans = KafkaDefaultConfiguration.class)
public class KafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {

/**
* The default streams configuration.
*/
public static final String PREFIX = "kafka.streams";


public interface KafkaStreamsConfiguration {
/**
* Construct a new {@link KafkaStreamsConfiguration} for the given defaults.
* The configuration.
*
* @param streamName The stream name
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
* @return Properties for the streams configuration
*/
public KafkaStreamsConfiguration(
@Parameter String streamName,
KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true);
config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new));
init(applicationConfiguration, environment, config);
}
Properties getConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;

import javax.annotation.PreDestroy;
import javax.inject.Singleton;
Expand All @@ -28,7 +27,6 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedDeque;


/**
* A factory that constructs the {@link KafkaStreams} bean.
*
Expand All @@ -40,30 +38,16 @@ public class KafkaStreamsFactory implements Closeable {

private final Collection<KafkaStreams> streams = new ConcurrentLinkedDeque<>();

/**
* Exposes the {@link ConfiguredStreamBuilder} as a bean.
*
* @param configuration The configuration
* @return The streams builder
*/
@EachBean(AbstractKafkaStreamsConfiguration.class)
ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration configuration) {
return new ConfiguredStreamBuilder(configuration.getConfig());
}

/**
* Builds the default {@link KafkaStreams} bean from the configuration and the supplied {@link ConfiguredStreamBuilder}.
*
* @param builder The builder
* @param kStreams The KStream definitions
* @param config The builder
* @return The {@link KafkaStreams} bean
*/
@EachBean(ConfiguredStreamBuilder.class)
@EachBean(KafkaStreamsConfiguration.class)
@Context
KafkaStreams kafkaStreams(
ConfiguredStreamBuilder builder,
// required for initialization. DO NOT DELETE
KStream... kStreams) {
KafkaStreams kafkaStreams(KafkaStreamsConfiguration config) {
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
KafkaStreams kafkaStreams = new KafkaStreams(
builder.build(),
builder.getConfiguration()
Expand Down Expand Up @@ -94,5 +78,4 @@ public void close() {
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2017-2019 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.runtime.ApplicationConfiguration;

import java.util.Properties;

import static io.micronaut.configuration.kafka.streams.NamedKafkaStreamsConfiguration.PREFIX;

/**
* The default configuration passed to {@link org.apache.kafka.streams.KafkaStreams}.
*
* @param <K> The generic key type
* @param <V> The generic value type
*/
@EachProperty(value = PREFIX, primary = "default")
@Requires(beans = KafkaDefaultConfiguration.class)
public class NamedKafkaStreamsConfiguration<K, V> extends AbstractKafkaStreamsConfiguration<K, V> {

/**
* The default streams configuration.
*/
public static final String PREFIX = "kafka.streams";


/**
* Construct a new {@link NamedKafkaStreamsConfiguration} for the given defaults.
*
* @param streamName The stream name
* @param defaultConfiguration The default configuration
* @param applicationConfiguration The application configuration
* @param environment The environment
*/
public NamedKafkaStreamsConfiguration(
@Parameter String streamName,
KafkaDefaultConfiguration defaultConfiguration,
ApplicationConfiguration applicationConfiguration,
Environment environment) {
super(defaultConfiguration);
Properties config = getConfig();
String propertyKey = PREFIX + '.' + NameUtils.hyphenate(streamName, true);
config.putAll(environment.getProperty(propertyKey, Properties.class).orElseGet(Properties::new));
init(applicationConfiguration, environment, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import javax.inject.Qualifier
import javax.naming.Name
import javax.validation.groups.Default

class KafkaStreamsSpec extends Specification {

@Shared
Expand All @@ -47,7 +51,8 @@ class KafkaStreamsSpec extends Specification {

void "test config"() {
when:
def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream'))
def config = context.getBean(NamedKafkaStreamsConfiguration, Qualifiers.byName("my-stream"))
def builder = new ConfiguredStreamBuilder(config)

then:
builder.configuration['application.id'] == "my-stream"
Expand All @@ -56,7 +61,7 @@ class KafkaStreamsSpec extends Specification {

void "test config from stream"() {
when:
def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream'))
def stream = context.getBean(KafkaStreams, Qualifiers.byName("my-stream"))

then:
stream.config.originals().get('application.id') == "my-stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// tag::imports[]

import io.micronaut.context.annotation.Factory;
import kafka.Kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -48,8 +49,9 @@ public class WordCountStream {
// tag::wordCountStream[]
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3>
KStream<String, String> wordCountStream(NamedKafkaStreamsConfiguration config) { // <3>
// set default serdes
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Expand Down Expand Up @@ -82,10 +84,11 @@ KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3
@Singleton
@Named(MY_STREAM)
KStream<String, String> myStream(
@Named(MY_STREAM) ConfiguredStreamBuilder builder) {
@Named(MY_STREAM) NamedKafkaStreamsConfiguration config) {

// end::namedStream[]
// set default serdes
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Expand Down