Skip to content

Commit

Permalink
Fix Test config and Test Config Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Wes Richardet committed Aug 29, 2019
1 parent 703ca1b commit 950a04f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ public class KafkaStreamsFactory implements Closeable {
/**
* Builds the default {@link KafkaStreams} bean from the configuration and the supplied {@link ConfiguredStreamBuilder}.
*
* @param builder The builder
* @param config The builder
* @return The {@link KafkaStreams} bean
*/
@EachBean(ConfiguredStreamBuilder.class)
@EachBean(KafkaStreamsConfiguration.class)
@Context
KafkaStreams kafkaStreams(ConfiguredStreamBuilder builder) {
KafkaStreams kafkaStreams(KafkaStreamsConfiguration config) {
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
KafkaStreams kafkaStreams = new KafkaStreams(
builder.build(),
builder.getConfiguration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import javax.inject.Qualifier
import javax.naming.Name
import javax.validation.groups.Default

class KafkaStreamsSpec extends Specification {

@Shared
Expand All @@ -47,7 +51,8 @@ class KafkaStreamsSpec extends Specification {

void "test config"() {
when:
def builder = context.getBean(ConfiguredStreamBuilder, Qualifiers.byName('my-stream'))
def config = context.getBean(NamedKafkaStreamsConfiguration, Qualifiers.byName("my-stream"))
def builder = new ConfiguredStreamBuilder(config)

then:
builder.configuration['application.id'] == "my-stream"
Expand All @@ -56,7 +61,7 @@ class KafkaStreamsSpec extends Specification {

void "test config from stream"() {
when:
def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream'))
def stream = context.getBean(KafkaStreams, Qualifiers.byName("my-stream"))

then:
stream.config.originals().get('application.id') == "my-stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// tag::imports[]

import io.micronaut.context.annotation.Factory;
import kafka.Kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -48,8 +49,9 @@ public class WordCountStream {
// tag::wordCountStream[]
@Singleton
@Named(STREAM_WORD_COUNT)
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3>
KStream<String, String> wordCountStream(NamedKafkaStreamsConfiguration config) { // <3>
// set default serdes
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Expand Down Expand Up @@ -82,10 +84,11 @@ KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // <3
@Singleton
@Named(MY_STREAM)
KStream<String, String> myStream(
@Named(MY_STREAM) ConfiguredStreamBuilder builder) {
@Named(MY_STREAM) NamedKafkaStreamsConfiguration config) {

// end::namedStream[]
// set default serdes
ConfiguredStreamBuilder builder = new ConfiguredStreamBuilder(config);
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Expand Down

0 comments on commit 950a04f

Please sign in to comment.