Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent f2fd574 commit 5637380
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka.integration;

import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -42,8 +41,6 @@
import com.bakdata.kafka.TopologyBuilder;
import com.bakdata.kafka.test_applications.LabeledInputTopics;
import com.bakdata.kafka.test_applications.Mirror;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.TopicClient;
import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -154,13 +151,9 @@ void shouldUseMultipleLabeledInputTopics() {
final String inputTopic2 = inputTopics.get(1);
final String outputTopic = app.getTopics().getOutputTopic();
final KafkaTestClient testClient = this.newTestClient();
try (final ImprovedAdminClient admin = testClient.admin()) {
try (final TopicClient topicClient = admin.getTopicClient()) {
topicClient.createTopic(inputTopic1, defaultTopicSettings());
topicClient.createTopic(inputTopic2, defaultTopicSettings());
topicClient.createTopic(outputTopic, defaultTopicSettings());
}
}
testClient.createTopic(inputTopic1);
testClient.createTopic(inputTopic2);
testClient.createTopic(outputTopic);
run(runner);
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package com.bakdata.kafka.util;


import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings;

import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
Expand Down Expand Up @@ -62,7 +60,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet()
final KafkaTestClient testClient = this.newTestClient();
try (final ImprovedAdminClient admin = testClient.admin();
final TopicClient topicClient = admin.getTopicClient()) {
topicClient.createTopic(TOPIC, defaultTopicSettings());
topicClient.createTopic(TOPIC, KafkaTestClient.defaultTopicSettings().build());
this.softly.assertThat(topicClient.exists(TOPIC))
.as("Topic is created")
.isTrue();
Expand Down Expand Up @@ -95,7 +93,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc
final KafkaTestClient testClient = this.newTestClient();
try (final ImprovedAdminClient admin = testClient.admin();
final TopicClient topicClient = admin.getTopicClient()) {
topicClient.createTopic(TOPIC, defaultTopicSettings());
topicClient.createTopic(TOPIC, KafkaTestClient.defaultTopicSettings().build());
this.softly.assertThat(topicClient.exists(TOPIC))
.as("Topic is created")
.isTrue();
Expand Down Expand Up @@ -129,7 +127,7 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr
final KafkaTestClient testClient = this.newTestClient();
try (final ImprovedAdminClient admin = testClient.admin();
final TopicClient topicClient = admin.getTopicClient()) {
topicClient.createTopic(TOPIC, defaultTopicSettings());
topicClient.createTopic(TOPIC, KafkaTestClient.defaultTopicSettings().build());
this.softly.assertThat(topicClient.exists(TOPIC))
.as("Topic is created")
.isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.KafkaTestClient;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -48,7 +48,7 @@ void shouldNotFindTopic() {
@Test
void shouldFindTopic() throws InterruptedException {
try (final TopicClient client = this.createClient()) {
client.createTopic("exists", defaultTopicSettings());
client.createTopic("exists", KafkaTestClient.defaultTopicSettings().build());
}
Thread.sleep(CLIENT_TIMEOUT.toMillis());
try (final TopicClient client = this.createClient()) {
Expand All @@ -59,8 +59,8 @@ void shouldFindTopic() throws InterruptedException {
@Test
void shouldListTopics() throws InterruptedException {
try (final TopicClient client = this.createClient()) {
client.createTopic("foo", defaultTopicSettings());
client.createTopic("bar", defaultTopicSettings());
client.createTopic("foo", KafkaTestClient.defaultTopicSettings().build());
client.createTopic("bar", KafkaTestClient.defaultTopicSettings().build());
}
Thread.sleep(CLIENT_TIMEOUT.toMillis());
try (final TopicClient client = this.createClient()) {
Expand All @@ -73,7 +73,7 @@ void shouldListTopics() throws InterruptedException {
@Test
void shouldDeleteTopic() throws InterruptedException {
try (final TopicClient client = this.createClient()) {
client.createTopic("foo", defaultTopicSettings());
client.createTopic("foo", KafkaTestClient.defaultTopicSettings().build());
}
Thread.sleep(CLIENT_TIMEOUT.toMillis());
try (final TopicClient client = this.createClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.kafka.util.TopicClient;
import com.bakdata.kafka.util.TopicSettings;
import com.bakdata.kafka.util.TopicSettings.TopicSettingsBuilder;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -37,13 +38,12 @@
@RequiredArgsConstructor
public class KafkaTestClient {

private static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder()
private static final TopicSettingsBuilder DEFAULT_TOPIC_SETTINGS = TopicSettings.builder()
.partitions(1)
.replicationFactor((short) 1)
.build();
.replicationFactor((short) 1);
private final @NonNull KafkaEndpointConfig endpointConfig;

public static TopicSettings defaultTopicSettings() {
public static TopicSettingsBuilder defaultTopicSettings() {
return DEFAULT_TOPIC_SETTINGS;
}

Expand Down Expand Up @@ -71,7 +71,7 @@ public void createTopic(final String topicName, final TopicSettings settings) {
}

public void createTopic(final String topicName) {
this.createTopic(topicName, defaultTopicSettings());
this.createTopic(topicName, defaultTopicSettings().build());
}

public boolean existsTopic(final String topicName) {
Expand Down

0 comments on commit 5637380

Please sign in to comment.