Skip to content

Commit

Permalink
Merge branch 'feature/topology-builder' of github.com:bakdata/streams…
Browse files Browse the repository at this point in the history
…-bootstrap into feature/host-info

# Conflicts:
#	streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java
  • Loading branch information
raminqaf committed Jun 25, 2024
2 parents 1bfe392 + 9addc66 commit add7275
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
Expand Down Expand Up @@ -54,11 +55,11 @@ public String getAppId() {

/**
* Get the bootstrap servers of the underlying {@link StreamsConfig}
* @return comma separated bootstrap servers config
* @return list of bootstrap servers
* @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG
*/
public String getBoostrapServers() {
return String.join(",", this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
public List<String> getBoostrapServers() {
return this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void runResetter(final Collection<String> inputTopics, final Colle
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties());
final ImmutableList.Builder<String> argList = ImmutableList.<String>builder()
.add("--application-id", appId)
.add("--bootstrap-server", streamsAppConfig.getBoostrapServers())
.add("--bootstrap-server", String.join(",", streamsAppConfig.getBoostrapServers()))
.add("--config-file", tempFile.toString());
final Collection<String> existingInputTopics = filterExistingTopics(inputTopics, allTopics);
if (!existingInputTopics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,49 +30,72 @@
import java.util.Map;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(SoftAssertionsExtension.class)
class ImprovedStreamsConfigTest {

@InjectSoftAssertions
private SoftAssertions softly;

@Test
void shouldGetAppId() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092")
Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092"
)
);
assertThat(new ImprovedStreamsConfig(config).getAppId())
this.softly.assertThat(new ImprovedStreamsConfig(config).getAppId())
.isEqualTo("test-app");
}

@Test
void shouldGetBrokerAddress() {
void shouldGetBootstrapServersFromList() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, List.of("broker1:9092", "broker2:9092", "broker3:9092")
)
);
assertThat(new ImprovedStreamsConfig(config).getBoostrapServers())
.isEqualTo("broker1:9092,broker2:9092,broker3:9092");
this.softly.assertThat(new ImprovedStreamsConfig(config).getBoostrapServers())
.isEqualTo(List.of("broker1:9092", "broker2:9092", "broker3:9092"));
}

@Test
void shouldGetBootstrapServersFromString() {
final StreamsConfig config = new StreamsConfig(
Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092"
)
);
this.softly.assertThat(new ImprovedStreamsConfig(config).getBoostrapServers())
.isEqualTo(List.of("broker1:9092", "broker2:9092", "broker3:9092"));
}

@Test
void shouldGetOriginalKafkaProperties() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092"
)
);
assertThat(new ImprovedStreamsConfig(config).getStreamsConfig())
this.softly.assertThat(new ImprovedStreamsConfig(config).getKafkaProperties())
.hasSize(2)
.anySatisfy((key, value) -> {
assertThat(key).isEqualTo(StreamsConfig.APPLICATION_ID_CONFIG);
assertThat(value).isEqualTo("test-app");
this.softly.assertThat(key).isEqualTo(StreamsConfig.APPLICATION_ID_CONFIG);
this.softly.assertThat(value).isEqualTo("test-app");
})
.anySatisfy((key, value) -> {
assertThat(key).isEqualTo(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
assertThat(value).isEqualTo("broker1:9092");
})
;
this.softly.assertThat(key).isEqualTo(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
this.softly.assertThat(value).isEqualTo("broker1:9092");
});
}

@Test
Expand All @@ -81,7 +104,7 @@ void shouldHaveHostInfoIfApplicationServiceIsConfigure() {
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092",
StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:9090"));
assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
this.softly.assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
.hasValue(new HostInfo("localhost", 9090));
}

Expand All @@ -90,7 +113,7 @@ void shouldReturnEmptyHostInfoIfApplicationServiceIsNotConfigure() {
final StreamsConfig config = new StreamsConfig(
Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
this.softly.assertThat(new ImprovedStreamsConfig(config).getApplicationServer())
.isEmpty();
}

Expand Down

0 comments on commit add7275

Please sign in to comment.