Skip to content

Commit

Permalink
Use Awaitility
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 17, 2025
1 parent b8b23e2 commit 90f6856
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 157 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
log4jVersion=2.24.3
awaitilityVersion=4.2.2
org.gradle.jvmargs=-Xmx4096m
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import static com.bakdata.kafka.KafkaTest.awaitAtMost;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -41,8 +42,10 @@

class CliTest {

private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
private static Thread runApp(final KafkaStreamsApplication<?> app, final String... args) {
final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args));
thread.start();
return thread;
}

@Test
Expand Down Expand Up @@ -209,7 +212,7 @@ public SerdeConfig defaultSerializationConfig() {

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
void shouldExitWithErrorInTopology() {
final String input = "input";
try (final KafkaContainer kafkaCluster = newCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
Expand All @@ -233,15 +236,15 @@ public SerdeConfig defaultSerializationConfig() {
})) {
kafkaCluster.start();

runApp(app,
final Thread thread = runApp(app,
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(kafkaCluster.getBootstrapServers())
.build()).send()
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
awaitAtMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class RunProducerAppTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);

@Test
void shouldRunApp() throws InterruptedException {
void shouldRunApp() {
final String output = "output";
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
Expand Down Expand Up @@ -84,7 +84,6 @@ public SerializerConfig defaultSerializationConfig() {
assertThat(kv.value().getContent()).isEqualTo("bar");
});
app.clean();
Thread.sleep(TIMEOUT.toMillis());
try (final ImprovedAdminClient admin = testClient.admin()) {
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
.as("Output topic is deleted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,8 @@ class StreamsCleanUpTest extends KafkaTest {
@InjectSoftAssertions
private SoftAssertions softly;

private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
runApp(app);
app.stop();
}

private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
Thread.sleep(TIMEOUT.toMillis());
}

@Test
void shouldClean() throws InterruptedException {
void shouldClean() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -94,7 +82,7 @@ void shouldClean() throws InterruptedException {
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
this.awaitClosed(app.createConfiguredApp().getUniqueAppId(), TIMEOUT);
app.clean();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -109,7 +97,7 @@ void shouldClean() throws InterruptedException {
}

@Test
void shouldReset() throws InterruptedException {
void shouldReset() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -128,7 +116,7 @@ void shouldReset() throws InterruptedException {
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
this.awaitClosed(app.createConfiguredApp().getUniqueAppId(), TIMEOUT);
app.reset();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -145,21 +133,31 @@ void shouldReset() throws InterruptedException {
}

@Test
void shouldCallClose() throws InterruptedException {
void shouldCallClose() {
try (final CloseFlagApp app = this.createCloseFlagApplication()) {
this.newTestClient().createTopic(app.getInputTopics().get(0));
Thread.sleep(TIMEOUT.toMillis());
this.softly.assertThat(app.isClosed()).isFalse();
this.softly.assertThat(app.isAppClosed()).isFalse();
app.clean();
this.softly.assertThat(app.isAppClosed()).isTrue();
app.setAppClosed(false);
Thread.sleep(TIMEOUT.toMillis());
app.reset();
this.softly.assertThat(app.isAppClosed()).isTrue();
}
}

private void runAppAndClose(final KafkaStreamsApplication<?> app) {
this.runApp(app);
app.stop();
}

private void runApp(final KafkaStreamsApplication<?> app) {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
this.awaitProcessing(app.createConfiguredApp().getUniqueAppId(), TIMEOUT);
}

private CloseFlagApp createCloseFlagApplication() {
final CloseFlagApp app = new CloseFlagApp();
app.setInputTopics(List.of("input"));
Expand All @@ -177,9 +175,8 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
}

private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
final String description, final KafkaStreamsApplication<?> app)
throws InterruptedException {
runAppAndClose(app);
final String description, final KafkaStreamsApplication<?> app) {
this.runAppAndClose(app);

final List<KeyValue<String, Long>> output = this.readOutputTopic(app.getOutputTopic());
this.softly.assertThat(output)
Expand Down
2 changes: 2 additions & 0 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val awaitilityVersion: String by project
testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion)
}

tasks.withType<Test> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;

/**
Expand Down Expand Up @@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) {
return new KafkaAdminException("Failed to list consumer groups", ex);
}

private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex);
}

private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to describe consumer group" + groupName, ex);
}

/**
* Delete a consumer group.
*
Expand All @@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) {
}
}

/**
* Describe a consumer group.
*
* @param groupName the consumer group name
* @return consumer group description
*/
public ConsumerGroupDescription describe(final String groupName) {
log.info("Describing consumer group '{}'", groupName);
try {
final ConsumerGroupDescription description =
this.adminClient.describeConsumerGroups(List.of(groupName))
.all()
.get(this.timeout.toSeconds(), TimeUnit.SECONDS)
.get(groupName);
log.info("Described consumer group '{}'", groupName);
return description;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToDescribeGroup(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToDescribeGroup(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToDescribeGroup(groupName, ex);
}
}

/**
* List offsets for a consumer group.
*
* @param groupName the consumer group name
* @return consumer group offsets
*/
public Map<TopicPartition, OffsetAndMetadata> listOffsets(final String groupName) {
log.info("Listing offsets for consumer group '{}'", groupName);
try {
final Map<TopicPartition, OffsetAndMetadata> offsets =
this.adminClient.listConsumerGroupOffsets(groupName)
.partitionsToOffsetAndMetadata(groupName)
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
log.info("Listed offsets for consumer group '{}'", groupName);
return offsets;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToListOffsets(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToListOffsets(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToListOffsets(groupName, ex);
}
}

@Override
public void close() {
this.adminClient.close();
Expand Down
Loading

0 comments on commit 90f6856

Please sign in to comment.