Skip to content

Commit

Permalink
Add HostInfo to RunningStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Jun 25, 2024
1 parent b3b5ec1 commit fc5784b
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,31 @@

package com.bakdata.kafka;

import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import lombok.Value;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;

/**
* Class for simplified access to configs provided by {@link StreamsConfig}
*/
@Value
public class ImprovedStreamsConfig {

@NonNull StreamsConfig kafkaProperties;
@NonNull StreamsConfig streamsConfig;

/**
* Get the application id of the underlying {@link StreamsConfig}
* @return application id
* @see StreamsConfig#APPLICATION_ID_CONFIG
*/
public String getAppId() {
return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG);
return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

/**
Expand All @@ -53,15 +57,33 @@ public String getAppId() {
* @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG
*/
public String getBoostrapServers() {
return String.join(",", this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
return String.join(",", this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
}

/**
* Get all configs of the underlying {@link StreamsConfig}
* @return Kafka configs
* @see StreamsConfig#originals()
*/
public Map<String, Object> getKafkaProperties() {
return Collections.unmodifiableMap(this.kafkaProperties.originals());
public Map<String, Object> getStreamsConfig() {
return Collections.unmodifiableMap(this.streamsConfig.originals());
}

/**
* Retrieves the host information based on the application server configuration.
*
* @return an {@code Optional} containing the {@link HostInfo} if the
* {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@code Optional}.
*/
public Optional<HostInfo> getApplicationServer() {
final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG);
return applicationServerConfig.isEmpty() ? Optional.empty()
: Optional.of(createHostInfo(applicationServerConfig));
}

private static HostInfo createHostInfo(final String applicationServerConfig) {
final String[] hostAndPort = applicationServerConfig.split(":");
return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@

package com.bakdata.kafka;

import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG;

import com.bakdata.kafka.StreamsExecutionOptions.StreamsExecutionOptionsBuilder;
import java.util.Optional;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.HostInfo;

/**
* A running {@link KafkaStreams} instance along with its {@link StreamsConfig} and
Expand All @@ -44,7 +48,7 @@
public class RunningStreams {

@NonNull
StreamsConfig config;
ImprovedStreamsConfig config;
@NonNull
Topology topology;
@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void runResetter(final Collection<String> inputTopics, final Colle
final Collection<String> allTopics, final ImprovedStreamsConfig streamsAppConfig) {
// StreamsResetter's internal AdminClient can only be configured with a properties file
final String appId = streamsAppConfig.getAppId();
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties());
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getStreamsConfig());
final ImmutableList.Builder<String> argList = ImmutableList.<String>builder()
.add("--application-id", appId)
.add("--bootstrap-server", streamsAppConfig.getBoostrapServers())
Expand Down Expand Up @@ -183,7 +183,7 @@ public void reset() {
}

private Map<String, Object> getKafkaProperties() {
return this.config.getKafkaProperties();
return this.config.getStreamsConfig();
}

private ImprovedAdminClient createAdminClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void runStreams() {
log.info("Calling start hook");
final RunningStreams runningStreams = RunningStreams.builder()
.streams(this.streams)
.config(this.config)
.config(new ImprovedStreamsConfig(this.config))
.topology(this.topology)
.build();
this.executionOptions.onStart(runningStreams);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.junit.jupiter.api.Test;

class ImprovedStreamsConfigTest {
@Test
void shouldGetAppId() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092")
)
);
assertThat(new ImprovedStreamsConfig(config).getAppId())
.isEqualTo("test-app");
}

@Test
void shouldGetBrokerAddress() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092")
)
);
assertThat(new ImprovedStreamsConfig(config).getBoostrapServers())
.isEqualTo("broker1:9092,broker2:9092,broker3:9092");
}

@Test
void shouldGetOriginalKafkaProperties() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092"
)
);
assertThat(new ImprovedStreamsConfig(config).getStreamsConfig())
.hasSize(2)
.anySatisfy((key, value) -> {
assertThat(key).isEqualTo(StreamsConfig.APPLICATION_ID_CONFIG);
assertThat(value).isEqualTo("test-app");
})
.anySatisfy((key, value) -> {
assertThat(key).isEqualTo(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
assertThat(value).isEqualTo("broker1:9092");
})
;
}

@Test
void shouldHaveHostInfoIfApplicationServiceIsConfigure() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092",
StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:9090"));
assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
.hasValue(new HostInfo("localhost", 9090));
}

@Test
void shouldReturnEmptyHostInfoIfApplicationServiceIsNotConfigure() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
.isEmpty();
}

}

0 comments on commit fc5784b

Please sign in to comment.