diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java index 0b92731d..2d7cb8df 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java @@ -24,11 +24,15 @@ package com.bakdata.kafka; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; + import java.util.Collections; import java.util.Map; +import java.util.Optional; import lombok.NonNull; import lombok.Value; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; /** * Class for simplified access to configs provided by {@link StreamsConfig} @@ -36,7 +40,7 @@ @Value public class ImprovedStreamsConfig { - @NonNull StreamsConfig kafkaProperties; + @NonNull StreamsConfig streamsConfig; /** * Get the application id of the underlying {@link StreamsConfig} @@ -44,7 +48,7 @@ public class ImprovedStreamsConfig { * @see StreamsConfig#APPLICATION_ID_CONFIG */ public String getAppId() { - return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG); + return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG); } /** @@ -53,7 +57,7 @@ public String getAppId() { * @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG */ public String getBoostrapServers() { - return String.join(",", this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); + return String.join(",", this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); } /** @@ -61,7 +65,25 @@ public String getBoostrapServers() { * @return Kafka configs * @see StreamsConfig#originals() */ - public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.kafkaProperties.originals()); + public Map getStreamsConfig() { + return Collections.unmodifiableMap(this.streamsConfig.originals()); + } + + /** + * Retrieves the host information based on the application server configuration. + * + * @return an {@code Optional} containing the {@link HostInfo} if the + * {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@code Optional}. + */ + public Optional getApplicationServer() { + final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG); + return applicationServerConfig.isEmpty() ? Optional.empty() + : Optional.of(createHostInfo(applicationServerConfig)); + } + + private static HostInfo createHostInfo(final String applicationServerConfig) { + final String[] hostAndPort = applicationServerConfig.split(":"); + return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); } + } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/RunningStreams.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/RunningStreams.java index 70a68f92..0a4e5a46 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/RunningStreams.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/RunningStreams.java @@ -24,7 +24,10 @@ package com.bakdata.kafka; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; + import com.bakdata.kafka.StreamsExecutionOptions.StreamsExecutionOptionsBuilder; +import java.util.Optional; import java.util.function.Consumer; import lombok.Builder; import lombok.NonNull; @@ -32,6 +35,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.HostInfo; /** * A running {@link KafkaStreams} instance along with its {@link StreamsConfig} and @@ -44,7 +48,7 @@ public class RunningStreams { @NonNull - StreamsConfig config; + ImprovedStreamsConfig config; @NonNull Topology topology; @NonNull diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 7246d138..61f6e13d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -99,7 +99,7 @@ public static void runResetter(final Collection inputTopics, final Colle final Collection allTopics, final ImprovedStreamsConfig streamsAppConfig) { // StreamsResetter's internal AdminClient can only be configured with a properties file final String appId = streamsAppConfig.getAppId(); - final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties()); + final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getStreamsConfig()); final ImmutableList.Builder argList = ImmutableList.builder() .add("--application-id", appId) .add("--bootstrap-server", streamsAppConfig.getBoostrapServers()) @@ -183,7 +183,7 @@ public void reset() { } private Map getKafkaProperties() { - return this.config.getKafkaProperties(); + return this.config.getStreamsConfig(); } private ImprovedAdminClient createAdminClient() { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java index ce11a30b..24edab90 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java @@ -113,7 +113,7 @@ private void runStreams() { log.info("Calling start hook"); final RunningStreams runningStreams = RunningStreams.builder() .streams(this.streams) - .config(this.config) + .config(new ImprovedStreamsConfig(this.config)) .topology(this.topology) .build(); this.executionOptions.onStart(runningStreams); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java new file mode 100644 index 00000000..bb17b0c2 --- /dev/null +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; +import org.junit.jupiter.api.Test; + +class ImprovedStreamsConfigTest { + @Test + void shouldGetAppId() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092") + ) + ); + assertThat(new ImprovedStreamsConfig(config).getAppId()) + .isEqualTo("test-app"); + } + + @Test + void shouldGetBrokerAddress() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092") + ) + ); + assertThat(new ImprovedStreamsConfig(config).getBoostrapServers()) + .isEqualTo("broker1:9092,broker2:9092,broker3:9092"); + } + + @Test + void shouldGetOriginalKafkaProperties() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092" + ) + ); + assertThat(new ImprovedStreamsConfig(config).getStreamsConfig()) + .hasSize(2) + .anySatisfy((key, value) -> { + assertThat(key).isEqualTo(StreamsConfig.APPLICATION_ID_CONFIG); + assertThat(value).isEqualTo("test-app"); + }) + .anySatisfy((key, value) -> { + assertThat(key).isEqualTo(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + assertThat(value).isEqualTo("broker1:9092"); + }) + ; + } + + @Test + void shouldHaveHostInfoIfApplicationServiceIsConfigure() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092", + StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:9090")); + assertThat(new ImprovedStreamsConfig(config).getApplicationServer()) + .hasValue(new HostInfo("localhost", 9090)); + } + + @Test + void shouldReturnEmptyHostInfoIfApplicationServiceIsNotConfigure() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); + assertThat(new ImprovedStreamsConfig(config).getApplicationServer()) + .isEmpty(); + } + +}