Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jul 23, 2024
1 parent 1798b84 commit c746819
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private Map<String, Object> createKafkaProperties(final Map<String, Object> conf
this.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv()));
this.putAll(KafkaPropertiesFactory.this.configuration.getKafkaConfig());
this.putAll(KafkaPropertiesFactory.this.endpointConfig.createKafkaProperties());
this.putAll(serializationConfig.createProperties());
this.putAllValidating(serializationConfig.createProperties());
this.putAllValidating(configOverrides);
return Collections.unmodifiableMap(this.kafkaConfig);
}
Expand All @@ -72,7 +72,7 @@ private void validateNotSet(final Map<String, ?> configs) {

private void validateNotSet(final String key) {
if (this.kafkaConfig.containsKey(key)) {
throw new IllegalArgumentException(String.format("'%s' has been configured already", key));
throw new IllegalArgumentException(String.format("'%s' should not be configured already", key));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
import org.apache.kafka.common.serialization.Serdes.LongSerde;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

Expand Down Expand Up @@ -88,6 +91,51 @@ void shouldSetDefaultSerde() {
.containsEntry(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class);
}

@Test
void shouldThrowIfKeySerdeHasBeenConfiguredDifferently() {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of(
DEFAULT_KEY_SERDE_CLASS_CONFIG, ByteArraySerde.class
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
.schemaRegistryUrl("fake")
.build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'default.key.serde' should not be configured already");
}

@Test
void shouldThrowIfValueSerdeHasBeenConfiguredDifferently() {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of(
DEFAULT_VALUE_SERDE_CLASS_CONFIG, ByteArraySerde.class
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
.schemaRegistryUrl("fake")
.build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'default.value.serde' should not be configured already");
}

@Test
void shouldThrowIfAppIdHasBeenConfiguredDifferently() {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(emptyTopicConfig(), Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, "my-app"
));
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), configuration);
assertThatThrownBy(() -> configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
.schemaRegistryUrl("fake")
.build()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("'application.id' should not be configured already");
}

private static class TestApplication implements StreamsApp {

@Override
Expand Down

0 comments on commit c746819

Please sign in to comment.