From 90f68565e7692c5a58d7a67a6795ea7e715a8721 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 17 Jan 2025 17:43:03 +0100 Subject: [PATCH] Use Awaitility --- gradle.properties | 1 + .../test/java/com/bakdata/kafka/CliTest.java | 13 +- .../kafka/integration/RunProducerAppTest.java | 3 +- .../kafka/integration/StreamsCleanUpTest.java | 41 +++--- streams-bootstrap-core/build.gradle.kts | 2 + .../kafka/util/ConsumerGroupClient.java | 68 ++++++++++ .../com/bakdata/kafka/util/TopicClient.java | 100 +++++++++------ .../integration/StreamsCleanUpRunnerTest.java | 117 +++++++++--------- .../kafka/integration/StreamsRunnerTest.java | 8 +- .../kafka/util/SchemaTopicClientTest.java | 11 +- .../bakdata/kafka/util/TopicClientTest.java | 42 ++++--- .../java/com/bakdata/kafka/KafkaTest.java | 55 ++++++++ streams-bootstrap-test/build.gradle.kts | 1 + .../com/bakdata/kafka/ProgressVerifier.java | 97 +++++++++++++++ 14 files changed, 402 insertions(+), 157 deletions(-) create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/ProgressVerifier.java diff --git a/gradle.properties b/gradle.properties index 86973b74..dc74ebe6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,4 +10,5 @@ junitVersion=5.11.4 mockitoVersion=5.15.2 assertJVersion=3.27.2 log4jVersion=2.24.3 +awaitilityVersion=4.2.2 org.gradle.jvmargs=-Xmx4096m diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 5d9d6616..02fcad78 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import static com.bakdata.kafka.KafkaTest.awaitAtMost; import static com.bakdata.kafka.KafkaTest.newCluster; import static org.assertj.core.api.Assertions.assertThat; @@ -41,8 +42,10 @@ class CliTest { - private static void runApp(final KafkaStreamsApplication app, final String... args) { - new Thread(() -> KafkaApplication.startApplication(app, args)).start(); + private static Thread runApp(final KafkaStreamsApplication app, final String... args) { + final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args)); + thread.start(); + return thread; } @Test @@ -209,7 +212,7 @@ public SerdeConfig defaultSerializationConfig() { @Test @ExpectSystemExitWithStatus(1) - void shouldExitWithErrorInTopology() throws InterruptedException { + void shouldExitWithErrorInTopology() { final String input = "input"; try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @@ -233,7 +236,7 @@ public SerdeConfig defaultSerializationConfig() { })) { kafkaCluster.start(); - runApp(app, + final Thread thread = runApp(app, "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input ); @@ -241,7 +244,7 @@ public SerdeConfig defaultSerializationConfig() { .bootstrapServers(kafkaCluster.getBootstrapServers()) .build()).send() .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); - Thread.sleep(Duration.ofSeconds(10).toMillis()); + awaitAtMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive()); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index a752f3db..40f0e996 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -50,7 +50,7 @@ class RunProducerAppTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); @Test - void shouldRunApp() throws InterruptedException { + void shouldRunApp() { final String output = "output"; try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() { @Override @@ -84,7 +84,6 @@ public SerializerConfig defaultSerializationConfig() { assertThat(kv.value().getContent()).isEqualTo("bar"); }); app.clean(); - Thread.sleep(TIMEOUT.toMillis()); try (final ImprovedAdminClient admin = testClient.admin()) { assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 83f807d3..9c686325 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -62,20 +62,8 @@ class StreamsCleanUpTest extends KafkaTest { @InjectSoftAssertions private SoftAssertions softly; - private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException { - runApp(app); - app.stop(); - } - - private static void runApp(final KafkaStreamsApplication app) throws InterruptedException { - // run in Thread because the application blocks indefinitely - new Thread(app).start(); - // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); - } - @Test - void shouldClean() throws InterruptedException { + void shouldClean() { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); @@ -94,7 +82,7 @@ void shouldClean() throws InterruptedException { this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.createConfiguredApp().getUniqueAppId(), TIMEOUT); app.clean(); try (final ImprovedAdminClient admin = testClient.admin()) { @@ -109,7 +97,7 @@ void shouldClean() throws InterruptedException { } @Test - void shouldReset() throws InterruptedException { + void shouldReset() { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); @@ -128,7 +116,7 @@ void shouldReset() throws InterruptedException { this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.createConfiguredApp().getUniqueAppId(), TIMEOUT); app.reset(); try (final ImprovedAdminClient admin = testClient.admin()) { @@ -145,21 +133,31 @@ void shouldReset() throws InterruptedException { } @Test - void shouldCallClose() throws InterruptedException { + void shouldCallClose() { try (final CloseFlagApp app = this.createCloseFlagApplication()) { this.newTestClient().createTopic(app.getInputTopics().get(0)); - Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); app.clean(); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); - Thread.sleep(TIMEOUT.toMillis()); app.reset(); this.softly.assertThat(app.isAppClosed()).isTrue(); } } + private void runAppAndClose(final KafkaStreamsApplication app) { + this.runApp(app); + app.stop(); + } + + private void runApp(final KafkaStreamsApplication app) { + // run in Thread because the application blocks indefinitely + new Thread(app).start(); + // Wait until stream application has consumed all data + this.awaitProcessing(app.createConfiguredApp().getUniqueAppId(), TIMEOUT); + } + private CloseFlagApp createCloseFlagApplication() { final CloseFlagApp app = new CloseFlagApp(); app.setInputTopics(List.of("input")); @@ -177,9 +175,8 @@ private List> readOutputTopic(final String outputTopic) { } private void runAndAssertContent(final Iterable> expectedValues, - final String description, final KafkaStreamsApplication app) - throws InterruptedException { - runAppAndClose(app); + final String description, final KafkaStreamsApplication app) { + this.runAppAndClose(app); final List> output = this.readOutputTopic(app.getOutputTopic()); this.softly.assertThat(output) diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 77d4a1f5..c5018061 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -38,6 +38,8 @@ dependencies { testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) + val awaitilityVersion: String by project + testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion) } tasks.withType { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java index b0322838..79d220a0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java @@ -36,7 +36,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; /** @@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) { return new KafkaAdminException("Failed to list consumer groups", ex); } + private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) { + return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex); + } + + private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) { + return new KafkaAdminException("Failed to describe consumer group" + groupName, ex); + } + /** * Delete a consumer group. * @@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) { } } + /** + * Describe a consumer group. + * + * @param groupName the consumer group name + * @return consumer group description + */ + public ConsumerGroupDescription describe(final String groupName) { + log.info("Describing consumer group '{}'", groupName); + try { + final ConsumerGroupDescription description = + this.adminClient.describeConsumerGroups(List.of(groupName)) + .all() + .get(this.timeout.toSeconds(), TimeUnit.SECONDS) + .get(groupName); + log.info("Described consumer group '{}'", groupName); + return description; + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw failedToDescribeGroup(groupName, ex); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToDescribeGroup(groupName, ex); + } catch (final TimeoutException ex) { + throw failedToDescribeGroup(groupName, ex); + } + } + + /** + * List offsets for a consumer group. + * + * @param groupName the consumer group name + * @return consumer group offsets + */ + public Map listOffsets(final String groupName) { + log.info("Listing offsets for consumer group '{}'", groupName); + try { + final Map offsets = + this.adminClient.listConsumerGroupOffsets(groupName) + .partitionsToOffsetAndMetadata(groupName) + .get(this.timeout.toSeconds(), TimeUnit.SECONDS); + log.info("Listed offsets for consumer group '{}'", groupName); + return offsets; + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw failedToListOffsets(groupName, ex); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToListOffsets(groupName, ex); + } catch (final TimeoutException ex) { + throw failedToListOffsets(groupName, ex); + } + } + @Override public void close() { this.adminClient.close(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 65dcb037..ab9ee10c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -33,16 +33,21 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.jooq.lambda.Seq; /** * This class offers helpers to interact with Kafka topics. @@ -73,10 +78,6 @@ private static KafkaAdminException failedToRetrieveTopicDescription(final String return new KafkaAdminException("Failed to retrieve description of topic " + topicName, e); } - private static KafkaAdminException failedToCheckIfTopicExists(final String topicName, final Throwable e) { - return new KafkaAdminException("Failed to check if Kafka topic " + topicName + " exists", e); - } - private static KafkaAdminException failedToListTopics(final Throwable ex) { return new KafkaAdminException("Failed to list topics", ex); } @@ -85,6 +86,10 @@ private static KafkaAdminException failedToCreateTopic(final String topicName, f return new KafkaAdminException("Failed to create topic " + topicName, ex); } + private static KafkaAdminException failedToListOffsets(final Throwable ex) { + return new KafkaAdminException("Failed to list offsets", ex); + } + /** * Creates a new Kafka topic with the specified number of partitions if it does not yet exist. If the topic exists, * its configuration is not updated. @@ -150,32 +155,17 @@ public void deleteTopic(final String topicName) { * @return settings of topic including number of partitions and replicationFactor */ public TopicSettings describe(final String topicName) { - try { - final Map> kafkaTopicMap = - this.adminClient.describeTopics(List.of(topicName)).topicNameValues(); - final TopicDescription description = - kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); - final List partitions = description.partitions(); - final int replicationFactor = partitions.stream() - .findFirst() - .map(TopicPartitionInfo::replicas) - .map(List::size) - .orElseThrow(() -> new IllegalStateException("Topic " + topicName + " has no partitions")); - return TopicSettings.builder() - .replicationFactor((short) replicationFactor) - .partitions(partitions.size()) - .build(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw failedToRetrieveTopicDescription(topicName, e); - } catch (final ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw failedToRetrieveTopicDescription(topicName, e); - } catch (final TimeoutException e) { - throw failedToRetrieveTopicDescription(topicName, e); - } + final TopicDescription description = this.getDescription(topicName); + final List partitions = description.partitions(); + final int replicationFactor = partitions.stream() + .findFirst() + .map(TopicPartitionInfo::replicas) + .map(List::size) + .orElseThrow(() -> new IllegalStateException("Topic " + topicName + " has no partitions")); + return TopicSettings.builder() + .replicationFactor((short) replicationFactor) + .partitions(partitions.size()) + .build(); } @Override @@ -190,24 +180,60 @@ public void close() { * @return whether a Kafka topic with the specified name exists or not */ public boolean exists(final String topicName) { + try { + this.getDescription(topicName); + return true; + } catch (final UnknownTopicOrPartitionException e) { + return false; + } + } + + /** + * Describe a Kafka topic. + * + * @param topicName the topic name + * @return topic description + */ + public TopicDescription getDescription(final String topicName) { try { final Map> kafkaTopicMap = this.adminClient.describeTopics(List.of(topicName)).topicNameValues(); - kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); - return true; + return kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); } catch (final ExecutionException e) { - if (e.getCause() instanceof UnknownTopicOrPartitionException) { - return false; + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); } + throw failedToRetrieveTopicDescription(topicName, e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw failedToRetrieveTopicDescription(topicName, e); + } catch (final TimeoutException e) { + throw failedToRetrieveTopicDescription(topicName, e); + } + } + + /** + * List offsets for a set of partitions. + * + * @param topicPartitions partitions to list offsets for + * @return partition offsets + */ + public Map listOffsets(final Iterable topicPartitions) { + try { + final Map offsetRequest = Seq.seq(topicPartitions) + .toMap(Function.identity(), o -> OffsetSpec.latest()); + return this.adminClient.listOffsets(offsetRequest).all() + .get(this.timeout.toSeconds(), TimeUnit.SECONDS); + } catch (final ExecutionException e) { if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); } - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } catch (final TimeoutException e) { - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 26477c90..014536a8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -132,16 +132,8 @@ private static void clean(final ExecutableApp app } } - private static void run(final ExecutableApp app) throws InterruptedException { - try (final StreamsRunner runner = app.createRunner()) { - StreamsRunnerTest.run(runner); - // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); - } - } - @Test - void shouldDeleteTopic() throws InterruptedException { + void shouldDeleteTopic() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -160,11 +152,11 @@ void shouldDeleteTopic() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); clean(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -177,7 +169,7 @@ void shouldDeleteTopic() throws InterruptedException { } @Test - void shouldDeleteConsumerGroup() throws InterruptedException { + void shouldDeleteConsumerGroup() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -196,7 +188,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -207,7 +199,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { .isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); clean(executableApp); try (final ImprovedAdminClient adminClient = this.createAdminClient(); @@ -220,7 +212,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { } @Test - void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedException { + void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -239,7 +231,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -250,7 +242,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept .isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); try (final ImprovedAdminClient adminClient = this.createAdminClient(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { @@ -264,7 +256,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept } @Test - void shouldDeleteInternalTopics() throws InterruptedException { + void shouldDeleteInternalTopics() { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { @@ -278,7 +270,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String uniqueAppId = app.getUniqueAppId(); @@ -298,7 +290,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); reset(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -314,7 +306,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { } @Test - void shouldDeleteIntermediateTopics() throws InterruptedException { + void shouldDeleteIntermediateTopics() { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { @@ -328,7 +320,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; @@ -341,7 +333,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); clean(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -355,7 +347,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { } @Test - void shouldDeleteState() throws InterruptedException { + void shouldDeleteState() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -374,14 +366,14 @@ void shouldDeleteState() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "All entries are once in the input topic after the 1st run"); - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); reset(executableApp); - run(executableApp); + this.run(executableApp); final List> entriesTwice = expectedValues.stream() .flatMap(entry -> Stream.of(entry, entry)) .collect(Collectors.toList()); @@ -391,7 +383,7 @@ void shouldDeleteState() throws InterruptedException { } @Test - void shouldReprocessAlreadySeenRecords() throws InterruptedException { + void shouldReprocessAlreadySeenRecords() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -404,23 +396,23 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { new SimpleProducerRecord<>(null, "c") )); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); reset(executableApp); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } } @Test void shouldDeleteValueSchema() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -434,10 +426,10 @@ void shouldDeleteValueSchema() new SimpleProducerRecord<>(null, testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-value", inputTopic + "-value"); @@ -450,7 +442,7 @@ void shouldDeleteValueSchema() @Test void shouldDeleteKeySchema() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -464,10 +456,10 @@ void shouldDeleteKeySchema() new SimpleProducerRecord<>(testRecord, "val") )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-key", inputTopic + "-key"); @@ -480,7 +472,7 @@ void shouldDeleteKeySchema() @Test void shouldDeleteSchemaOfInternalTopics() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -495,10 +487,10 @@ void shouldDeleteSchemaOfInternalTopics() new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); final String inputSubject = inputTopic + "-value"; final String uniqueAppId = app.getUniqueAppId(); final String internalSubject = @@ -518,7 +510,7 @@ void shouldDeleteSchemaOfInternalTopics() @Test void shouldDeleteSchemaOfIntermediateTopics() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -533,10 +525,10 @@ void shouldDeleteSchemaOfIntermediateTopics() new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); final String inputSubject = inputTopic + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) @@ -588,16 +580,15 @@ void shouldNotThrowExceptionOnMissingInputTopic() { } @Test - void shouldThrowExceptionOnResetterError() throws InterruptedException { + void shouldThrowExceptionOnResetterError() { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); + this.awaitActive(app.getUniqueAppId(), TIMEOUT); // should throw exception because consumer group is still active this.softly.assertThatThrownBy(() -> reset(executableApp)) .isInstanceOf(CleanUpException.class) @@ -606,7 +597,7 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { } @Test - void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException { + void shouldReprocessAlreadySeenRecordsWithPattern() { try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -622,20 +613,28 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException new SimpleProducerRecord<>(null, "c") )); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all streams application are completely stopped before triggering cleanup + this.awaitClosed(app.getUniqueAppId(), TIMEOUT); reset(executableApp); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } } + private void run(final ExecutableStreamsApp app) { + try (final StreamsRunner runner = app.createRunner()) { + StreamsRunnerTest.run(runner); + // Wait until stream application has consumed all data + this.awaitProcessing(app.getApp().getUniqueAppId(app.getEffectiveConfig().getTopics()), TIMEOUT); + } + } + private ConfiguredStreamsApp createComplexApplication() { this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 612955e0..a6a81c0f 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -172,7 +172,7 @@ void shouldUseMultipleLabeledInputTopics() { } @Test - void shouldThrowOnMissingInputTopic() throws InterruptedException { + void shouldThrowOnMissingInputTopic() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); try (final ConfiguredStreamsApp app = createMirrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) @@ -183,7 +183,7 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException { final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - Thread.sleep(TIMEOUT.toMillis()); + awaitAtMost(TIMEOUT).until(() -> !thread.isAlive()); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class); verify(this.uncaughtExceptionHandler).handle(any()); @@ -192,7 +192,7 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException { } @Test - void shouldCloseOnMapError() throws InterruptedException { + void shouldCloseOnMapError() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); try (final ConfiguredStreamsApp app = createErrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) @@ -211,7 +211,7 @@ void shouldCloseOnMapError() throws InterruptedException { .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); - Thread.sleep(TIMEOUT.toMillis()); + this.awaitProcessing(app.getUniqueAppId(), TIMEOUT); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) .satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map")); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 5a3405c3..cd7097fa 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -56,7 +56,7 @@ class SchemaTopicClientTest extends KafkaTest { @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -79,8 +79,6 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(client.getAllSubjects()) .doesNotContain(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) @@ -89,7 +87,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() } @Test - void shouldResetSchema() throws InterruptedException, IOException, RestClientException { + void shouldResetSchema() throws IOException, RestClientException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -112,8 +110,6 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc schemaTopicClient.resetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(client.getAllSubjects()) .doesNotContain(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) @@ -122,7 +118,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc } @Test - void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, + void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws RestClientException, IOException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); @@ -146,7 +142,6 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index 873b81d4..e9df279d 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -46,40 +46,42 @@ void shouldNotFindTopic() { } @Test - void shouldFindTopic() throws InterruptedException { + void shouldFindTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("exists", KafkaTestClient.defaultTopicSettings().build()); } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - try (final TopicClient client = this.createClient()) { - assertThat(client.exists("exists")).isTrue(); - } + awaitAtMost(CLIENT_TIMEOUT) + .untilAsserted(() -> { + try (final TopicClient client = this.createClient()) { + assertThat(client.exists("exists")).isTrue(); + } + }); } @Test - void shouldListTopics() throws InterruptedException { + void shouldListTopics() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", KafkaTestClient.defaultTopicSettings().build()); client.createTopic("bar", KafkaTestClient.defaultTopicSettings().build()); } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - try (final TopicClient client = this.createClient()) { - assertThat(client.listTopics()) - .hasSize(2) - .containsExactlyInAnyOrder("foo", "bar"); - } + awaitAtMost(CLIENT_TIMEOUT) + .untilAsserted(() -> { + try (final TopicClient client = this.createClient()) { + assertThat(client.listTopics()) + .hasSize(2) + .containsExactlyInAnyOrder("foo", "bar"); + } + }); } @Test - void shouldDeleteTopic() throws InterruptedException { + void shouldDeleteTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", KafkaTestClient.defaultTopicSettings().build()); } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { - assertThat(client.listTopics()) - .hasSize(1) - .containsExactlyInAnyOrder("foo"); + awaitAtMost(CLIENT_TIMEOUT) + .until(() -> client.exists("foo")); client.deleteTopic("foo"); assertThat(client.listTopics()) .isEmpty(); @@ -87,7 +89,7 @@ void shouldDeleteTopic() throws InterruptedException { } @Test - void shouldCreateTopic() throws InterruptedException { + void shouldCreateTopic() { try (final TopicClient client = this.createClient()) { assertThat(client.exists("topic")).isFalse(); final TopicSettings settings = TopicSettings.builder() @@ -96,8 +98,8 @@ void shouldCreateTopic() throws InterruptedException { .replicationFactor((short) 1) .build(); client.createTopic("topic", settings, emptyMap()); - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - assertThat(client.exists("topic")).isTrue(); + awaitAtMost(CLIENT_TIMEOUT) + .until(() -> client.exists("foo")); assertThat(client.describe("topic")) .satisfies(info -> { assertThat(info.getReplicationFactor()).isEqualTo((short) 1); diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index c58c8722..a9ac5d45 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -24,7 +24,11 @@ package com.bakdata.kafka; +import static org.awaitility.Awaitility.await; + import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; +import org.awaitility.core.ConditionFactory; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -32,6 +36,8 @@ @Testcontainers public abstract class KafkaTest { + private static final Duration POLL_INTERVAL = Duration.ofSeconds(1L); + private static final Duration POLL_DELAY = Duration.ofSeconds(1L); private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); @@ -40,6 +46,13 @@ public static KafkaContainer newCluster() { return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1")); } + public static ConditionFactory awaitAtMost(final Duration timeout) { + return await() + .pollInterval(POLL_INTERVAL) + .pollDelay(POLL_DELAY) + .atMost(timeout); + } + protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.getBootstrapServers()) @@ -68,4 +81,46 @@ protected String getSchemaRegistryUrl() { protected SchemaRegistryClient getSchemaRegistryClient() { return this.testTopologyFactory.getSchemaRegistryClient(); } + + protected void awaitMessages(final String topic, final int numberOfMessages, final Duration timeout) { + awaitAtMost(timeout) + .until(() -> this.hasMessages(topic, numberOfMessages, timeout)); + } + + protected void awaitProcessing(final String group, final Duration timeout) { + this.awaitActive(group, timeout); + awaitAtMost(timeout) + .until(() -> this.hasFinishedProcessing(group)); + } + + protected void awaitActive(final String group, final Duration timeout) { + awaitAtMost(timeout) + .until(() -> this.isActive(group)); + } + + protected void awaitClosed(final String group, final Duration timeout) { + awaitAtMost(timeout) + .until(() -> this.isClosed(group)); + } + + private boolean hasMessages(final String topic, final int numberOfMessages, final Duration timeout) { + return this.verifier().hasMessages(topic, numberOfMessages, timeout); + } + + private ProgressVerifier verifier() { + return new ProgressVerifier(this.newTestClient()); + } + + private boolean hasFinishedProcessing(final String group) { + return this.verifier().hasFinishedProcessing(group); + } + + private boolean isClosed(final String group) { + return this.verifier().isClosed(group); + } + + private boolean isActive(final String group) { + return this.verifier().isActive(group); + } + } diff --git a/streams-bootstrap-test/build.gradle.kts b/streams-bootstrap-test/build.gradle.kts index 99a47667..a9010254 100644 --- a/streams-bootstrap-test/build.gradle.kts +++ b/streams-bootstrap-test/build.gradle.kts @@ -8,4 +8,5 @@ dependencies { name = "fluent-kafka-streams-tests-junit5", version = fluentKafkaVersion ) + implementation(group = "com.bakdata.seq2", name = "seq2", version = "1.0.12") } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ProgressVerifier.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ProgressVerifier.java new file mode 100644 index 00000000..fd2104a0 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ProgressVerifier.java @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ConsumerGroupClient; +import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; +import com.bakdata.util.seq2.PairSeq; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +@RequiredArgsConstructor +public class ProgressVerifier { + + private final KafkaTestClient testClient; + + public boolean isActive(final String group) { + return this.getState(group) == ConsumerGroupState.STABLE; + } + + public boolean isClosed(final String group) { + return this.getState(group) == ConsumerGroupState.EMPTY; + } + + public boolean hasFinishedProcessing(final String group) { + return this.computeLag(group) == 0; + } + + public boolean hasMessages(final String topic, final int numberOfMessages, final Duration timeout) { + final List> records = this.testClient.read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .from(topic, timeout); + return records.size() == numberOfMessages; + } + + private long computeLag(final String group) { + try (final ImprovedAdminClient admin = this.testClient.admin(); + final ConsumerGroupClient consumerGroupClient = admin.getConsumerGroupClient(); + final TopicClient topicClient = admin.getTopicClient()) { + final Map consumerOffsetsMap = consumerGroupClient.listOffsets(group); + final PairSeq consumerOffsets = PairSeq.seq(consumerOffsetsMap) + .mapValues(OffsetAndMetadata::offset); + final Map partitionOffsetsMap = + topicClient.listOffsets(consumerOffsetsMap.keySet()); + final PairSeq partitionOffsets = PairSeq.seq(partitionOffsetsMap) + .mapValues(ListOffsetsResultInfo::offset); + return consumerOffsets.innerJoinByKey(partitionOffsets) + .values() + .mapToPair(Function.identity()) + .map((consumerOffset, latestOffset) -> latestOffset - consumerOffset) + .sum() + .orElse(0L); + } + } + + private ConsumerGroupState getState(final String group) { + try (final ImprovedAdminClient admin = this.testClient.admin(); + final ConsumerGroupClient consumerGroupClient = admin.getConsumerGroupClient()) { + final ConsumerGroupDescription description = consumerGroupClient.describe(group); + return description.state(); + } + } +}