From ef7d0fac7a25d9ae8f68282f3a855e8a7dca0048 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 09:14:18 +0100 Subject: [PATCH 01/15] Make methods running before application start public --- .../com/bakdata/kafka/KafkaApplication.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 15b68e12..aeaded91 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -257,36 +257,36 @@ public final CleanableApp createCleanableApp() { return cleanableApp; } - /** - * Create a new {@code ConfiguredApp} that will be executed according to the given config. - * - * @param app app to configure. - * @param configuration configuration for app - * @return {@code ConfiguredApp} - */ - protected abstract CA createConfiguredApp(final A app, AppConfiguration configuration); - /** * Called before starting the application, e.g., invoking {@link #run()} */ - protected void onApplicationStart() { + public void onApplicationStart() { // do nothing by default } /** * Called before running the application, i.e., invoking {@link #run()} */ - protected void prepareRun() { + public void prepareRun() { // do nothing by default } /** * Called before cleaning the application, i.e., invoking {@link #clean()} */ - protected void prepareClean() { + public void prepareClean() { // do nothing by default } + /** + * Create a new {@code ConfiguredApp} that will be executed according to the given config. + * + * @param app app to configure. + * @param configuration configuration for app + * @return {@code ConfiguredApp} + */ + protected abstract CA createConfiguredApp(final A app, AppConfiguration configuration); + private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); From 6ef31f0c9f7771863e037af02ebc411da0c6aa91 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 09:17:35 +0100 Subject: [PATCH 02/15] Make methods running before application start public --- .../main/java/com/bakdata/kafka/KafkaStreamsApplication.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 6f561ad5..f965244a 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -154,7 +154,7 @@ public final ConfiguredStreamsApp createConfiguredApp(final T app, * Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()} */ @Override - protected void prepareClean() { + public void prepareClean() { super.prepareClean(); } From a725afba712e0a5a883ab91e05ae3128ab6399da Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 10:12:59 +0100 Subject: [PATCH 03/15] Make methods running before application start public --- .../com/bakdata/kafka/TestTopologyFactory.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 883a027b..b39381a7 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -38,6 +38,7 @@ import java.util.Objects; import java.util.UUID; import lombok.AccessLevel; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; @@ -46,6 +47,7 @@ * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} */ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter public final class TestTopologyFactory { private static final String MOCK_URL_PREFIX = "mock://"; @@ -122,15 +124,6 @@ public static Map createStreamsTestConfig() { return STREAMS_TEST_CONFIG; } - /** - * Get Schema Registry URL if configured - * @return Schema Registry URL - * @throws NullPointerException if Schema Registry is not configured - */ - public String getSchemaRegistryUrl() { - return Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured"); - } - /** * Get {@code SchemaRegistryClient} for configured URL with default providers * @return {@code SchemaRegistryClient} @@ -147,8 +140,10 @@ public SchemaRegistryClient getSchemaRegistryClient() { * @throws NullPointerException if Schema Registry is not configured */ public SchemaRegistryClient getSchemaRegistryClient(final List providers) { - return SchemaRegistryClientFactory.newClient(List.of(this.getSchemaRegistryUrl()), 0, providers, emptyMap(), - null); + final List baseUrls = List.of( + Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured") + ); + return SchemaRegistryClientFactory.newClient(baseUrls, 0, providers, emptyMap(), null); } /** From d40198dd8bd84f87794bc632597d07e9413783fd Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 12:22:05 +0100 Subject: [PATCH 04/15] Make methods running before application start public --- settings.gradle | 1 + streams-bootstrap-cli-test/build.gradle.kts | 6 ++ streams-bootstrap-cli-test/lombok.config | 3 + .../CapturingUncaughtExceptionHandler.java | 38 +++++++ .../bakdata/kafka/TestApplicationHelper.java | 101 ++++++++++++++++++ streams-bootstrap-cli/build.gradle.kts | 1 + .../kafka/integration/RunStreamsAppTest.java | 4 +- .../kafka/integration/StreamsCleanUpTest.java | 4 +- 8 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 streams-bootstrap-cli-test/build.gradle.kts create mode 100644 streams-bootstrap-cli-test/lombok.config create mode 100644 streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java create mode 100644 streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java diff --git a/settings.gradle b/settings.gradle index e9057df9..05c49fdb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -11,4 +11,5 @@ include( ":streams-bootstrap-test", ":streams-bootstrap-large-messages", ":streams-bootstrap-cli", + ":streams-bootstrap-cli-test", ) diff --git a/streams-bootstrap-cli-test/build.gradle.kts b/streams-bootstrap-cli-test/build.gradle.kts new file mode 100644 index 00000000..3779fdb0 --- /dev/null +++ b/streams-bootstrap-cli-test/build.gradle.kts @@ -0,0 +1,6 @@ +description = "Utils for testing your Kafka Streams Application" + +dependencies { + api(project(":streams-bootstrap-test")) + api(project(":streams-bootstrap-cli")) +} diff --git a/streams-bootstrap-cli-test/lombok.config b/streams-bootstrap-cli-test/lombok.config new file mode 100644 index 00000000..189c0bef --- /dev/null +++ b/streams-bootstrap-cli-test/lombok.config @@ -0,0 +1,3 @@ +# This file is generated by the 'io.freefair.lombok' Gradle plugin +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java new file mode 100644 index 00000000..2b83dc72 --- /dev/null +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.lang.Thread.UncaughtExceptionHandler; +import lombok.Getter; + +@Getter +public class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler { + private Throwable lastException; + + @Override + public void uncaughtException(final Thread t, final Throwable e) { + this.lastException = e; + } +} diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java new file mode 100644 index 00000000..45f56af9 --- /dev/null +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -0,0 +1,101 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.fluent_kafka_streams_tests.TestTopology; +import java.lang.Thread.UncaughtExceptionHandler; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Delegate; +import picocli.CommandLine; + +@RequiredArgsConstructor +public final class TestApplicationHelper { + + @Delegate + private final @NonNull TestTopologyFactory topologyFactory; + + public static TestApplicationHelper withoutSchemaRegistry() { + return new TestApplicationHelper(TestTopologyFactory.withoutSchemaRegistry()); + } + + public static TestApplicationHelper withSchemaRegistry() { + return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry()); + } + + public static TestApplicationHelper withSchemaRegistry(final String schemaRegistryUrl) { + return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry(schemaRegistryUrl)); + } + + public Thread runApplication(final KafkaStreamsApplication app) { + this.configure(app); + new CommandLine(app); // initialize all mixins + app.onApplicationStart(); + final Thread thread = new Thread(app); + final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); + thread.setUncaughtExceptionHandler(handler); + thread.start(); + return thread; + } + + public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { + this.configure(app); + final KafkaEndpointConfig endpointConfig = app.getEndpointConfig(); + final KafkaTestClient testClient = new KafkaTestClient(endpointConfig); + try (final ConfiguredStreamsApp configuredApp = app.createConfiguredApp()) { + final String uniqueAppId = configuredApp.getUniqueAppId(); + return new ConsumerGroupVerifier(uniqueAppId, testClient::admin); + } + } + + public ConfiguredStreamsApp createConfiguredApp( + final KafkaStreamsApplication app) { + this.configure(app); + app.prepareRun(); + return app.createConfiguredApp(); + } + + public TestTopology createTopology(final KafkaStreamsApplication app) { + final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); + return this.createTopology(configuredApp); + } + + public TestTopology createTopologyExtension(final KafkaStreamsApplication app) { + final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); + return this.createTopologyExtension(configuredApp); + } + + public KafkaTestClient newTestClient(final String bootstrapServers) { + return new KafkaTestClient(KafkaEndpointConfig.builder() + .bootstrapServers(bootstrapServers) + .schemaRegistryUrl(this.getSchemaRegistryUrl()) + .build()); + } + + public void configure(final KafkaStreamsApplication app) { + app.setSchemaRegistryUrl(this.getSchemaRegistryUrl()); + } + +} diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 7cc8ba99..7a3d7dd0 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -18,6 +18,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) testImplementation(testFixtures(project(":streams-bootstrap-core"))) + testImplementation(project(":streams-bootstrap-cli-test")) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") val confluentVersion: String by project testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index b3a99c2f..614ea3f3 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -31,6 +31,7 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestApplicationHelper; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.Mirror; import java.nio.file.Path; @@ -57,8 +58,7 @@ void shouldRunApp() { app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - // run in Thread because the application blocks indefinitely - new Thread(app).start(); + TestApplicationHelper.withoutSchemaRegistry().runApplication(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 200af6f0..a44414db 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -31,6 +31,7 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestApplicationHelper; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; @@ -155,8 +156,7 @@ private void runAppAndClose(final KafkaStreamsApplication app) { } private void runApp(final KafkaStreamsApplication app) { - // run in Thread because the application blocks indefinitely - new Thread(app).start(); + TestApplicationHelper.withoutSchemaRegistry().runApplication(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } From 40623a9cce742056e1722689a3a4364509ce2f88 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 12:38:25 +0100 Subject: [PATCH 05/15] Make methods running before application start public --- .../bakdata/kafka/TestApplicationHelper.java | 32 +++--- .../kafka/integration/RunStreamsAppTest.java | 3 +- .../kafka/integration/StreamsCleanUpTest.java | 4 +- .../com/bakdata/kafka/AvroMirrorTest.java | 4 +- .../java/com/bakdata/kafka/KafkaTest.java | 6 +- .../com/bakdata/kafka/SchemaRegistryEnv.java | 97 +++++++++++++++++++ .../bakdata/kafka/TestTopologyFactory.java | 66 +------------ 7 files changed, 125 insertions(+), 87 deletions(-) create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java index 45f56af9..927f9c68 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -26,28 +26,16 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import java.lang.Thread.UncaughtExceptionHandler; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import lombok.experimental.Delegate; import picocli.CommandLine; @RequiredArgsConstructor public final class TestApplicationHelper { - @Delegate - private final @NonNull TestTopologyFactory topologyFactory; - - public static TestApplicationHelper withoutSchemaRegistry() { - return new TestApplicationHelper(TestTopologyFactory.withoutSchemaRegistry()); - } - - public static TestApplicationHelper withSchemaRegistry() { - return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry()); - } - - public static TestApplicationHelper withSchemaRegistry(final String schemaRegistryUrl) { - return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry(schemaRegistryUrl)); - } + @Getter + private final @NonNull SchemaRegistryEnv schemaRegistryEnv; public Thread runApplication(final KafkaStreamsApplication app) { this.configure(app); @@ -79,23 +67,29 @@ public ConfiguredStreamsApp createConfiguredApp( public TestTopology createTopology(final KafkaStreamsApplication app) { final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); - return this.createTopology(configuredApp); + final TestTopologyFactory testTopologyFactory = this.createTestTopologyFactory(); + return testTopologyFactory.createTopology(configuredApp); } public TestTopology createTopologyExtension(final KafkaStreamsApplication app) { final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); - return this.createTopologyExtension(configuredApp); + final TestTopologyFactory testTopologyFactory = this.createTestTopologyFactory(); + return testTopologyFactory.createTopologyExtension(configuredApp); } public KafkaTestClient newTestClient(final String bootstrapServers) { return new KafkaTestClient(KafkaEndpointConfig.builder() .bootstrapServers(bootstrapServers) - .schemaRegistryUrl(this.getSchemaRegistryUrl()) + .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) .build()); } public void configure(final KafkaStreamsApplication app) { - app.setSchemaRegistryUrl(this.getSchemaRegistryUrl()); + app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + } + + private TestTopologyFactory createTestTopologyFactory() { + return new TestTopologyFactory(this.schemaRegistryEnv); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 614ea3f3..b0601a37 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; import static org.assertj.core.api.Assertions.assertThat; import com.bakdata.kafka.KafkaStreamsApplication; @@ -58,7 +59,7 @@ void shouldRunApp() { app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - TestApplicationHelper.withoutSchemaRegistry().runApplication(app); + new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index a44414db..00e10b12 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,6 +25,8 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; + import com.bakdata.kafka.CloseFlagApp; import com.bakdata.kafka.KafkaStreamsApplication; import com.bakdata.kafka.KafkaTest; @@ -156,7 +158,7 @@ private void runAppAndClose(final KafkaStreamsApplication app) { } private void runApp(final KafkaStreamsApplication app) { - TestApplicationHelper.withoutSchemaRegistry().runApplication(app); + new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 553fde3b..3ddad2f2 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import static com.bakdata.kafka.SchemaRegistryEnv.withSchemaRegistry; + import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.test_applications.MirrorWithNonDefaultSerde; import java.util.List; @@ -35,7 +37,7 @@ class AvroMirrorTest { private final ConfiguredStreamsApp app = createApp(); @RegisterExtension final TestTopologyExtension testTopology = - TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app); + new TestTopologyFactory(withSchemaRegistry()).createTopologyExtension(this.app); private static ConfiguredStreamsApp createApp() { final AppConfiguration configuration = new AppConfiguration<>(StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 20a05f59..3e8b094e 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -36,7 +36,7 @@ @Testcontainers public abstract class KafkaTest { protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); - private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); + private final SchemaRegistryEnv schemaRegistryEnv = SchemaRegistryEnv.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); @@ -73,11 +73,11 @@ protected KafkaTestClient newTestClient() { } protected String getSchemaRegistryUrl() { - return this.testTopologyFactory.getSchemaRegistryUrl(); + return this.schemaRegistryEnv.getSchemaRegistryUrl(); } protected SchemaRegistryClient getSchemaRegistryClient() { - return this.testTopologyFactory.getSchemaRegistryClient(); + return this.schemaRegistryEnv.getSchemaRegistryClient(); } protected void awaitProcessing(final ExecutableStreamsApp app) { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java new file mode 100644 index 00000000..53438d25 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyMap; + +import io.confluent.kafka.schemaregistry.SchemaProvider; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Class that provides helpers for using schema registry in tests. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +public final class SchemaRegistryEnv { + + private static final String MOCK_URL_PREFIX = "mock://"; + private final String schemaRegistryUrl; + + /** + * Create a new {@code SchemaRegistryEnv} with no configured Schema Registry. + * @return {@code SchemaRegistryEnv} with no configured Schema Registry + */ + public static SchemaRegistryEnv withoutSchemaRegistry() { + return withSchemaRegistry(null); + } + + /** + * Create a new {@code SchemaRegistryEnv} with configured + * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid + * collisions between different test instances as scopes are retained globally. + * @return {@code SchemaRegistryEnv} with configured Schema Registry + */ + public static SchemaRegistryEnv withSchemaRegistry() { + return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); + } + + /** + * Create a new {@code SchemaRegistryEnv} with configured Schema Registry. + * @param schemaRegistryUrl Schema Registry URL to use + * @return {@code SchemaRegistryEnv} with configured Schema Registry + */ + public static SchemaRegistryEnv withSchemaRegistry(final String schemaRegistryUrl) { + return new SchemaRegistryEnv(schemaRegistryUrl); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL with default providers + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient() { + return this.getSchemaRegistryClient(null); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL + * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient(final List providers) { + final List baseUrls = List.of( + Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured") + ); + return SchemaRegistryClientFactory.newClient(baseUrls, 0, providers, emptyMap(), null); + } +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index b39381a7..9e4b8339 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -24,21 +24,13 @@ package com.bakdata.kafka; -import static java.util.Collections.emptyMap; - import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; -import io.confluent.kafka.schemaregistry.SchemaProvider; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import java.nio.file.Path; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import lombok.AccessLevel; import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; @@ -46,44 +38,16 @@ /** * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} */ -@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@RequiredArgsConstructor @Getter public final class TestTopologyFactory { - private static final String MOCK_URL_PREFIX = "mock://"; private static final Map STREAMS_TEST_CONFIG = Map.of( // Disable caching to allow immediate aggregations StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000) ); - private final String schemaRegistryUrl; - - /** - * Create a new {@code TestTopologyFactory} with no configured Schema Registry. - * @return {@code TestTopologyFactory} with no configured Schema Registry - */ - public static TestTopologyFactory withoutSchemaRegistry() { - return withSchemaRegistry(null); - } - - /** - * Create a new {@code TestTopologyFactory} with configured - * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid - * collisions between different test instances as scopes are retained globally. - * @return {@code TestTopologyFactory} with configured Schema Registry - */ - public static TestTopologyFactory withSchemaRegistry() { - return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); - } - - /** - * Create a new {@code TestTopologyFactory} with configured Schema Registry. - * @param schemaRegistryUrl Schema Registry URL to use - * @return {@code TestTopologyFactory} with configured Schema Registry - */ - public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) { - return new TestTopologyFactory(schemaRegistryUrl); - } + private final @NonNull SchemaRegistryEnv schemaRegistryEnv; /** * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and @@ -124,28 +88,6 @@ public static Map createStreamsTestConfig() { return STREAMS_TEST_CONFIG; } - /** - * Get {@code SchemaRegistryClient} for configured URL with default providers - * @return {@code SchemaRegistryClient} - * @throws NullPointerException if Schema Registry is not configured - */ - public SchemaRegistryClient getSchemaRegistryClient() { - return this.getSchemaRegistryClient(null); - } - - /** - * Get {@code SchemaRegistryClient} for configured URL - * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} - * @return {@code SchemaRegistryClient} - * @throws NullPointerException if Schema Registry is not configured - */ - public SchemaRegistryClient getSchemaRegistryClient(final List providers) { - final List baseUrls = List.of( - Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured") - ); - return SchemaRegistryClientFactory.newClient(baseUrls, 0, providers, emptyMap(), null); - } - /** * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects a {@link KafkaEndpointConfig} * for test purposes with Schema Registry optionally configured. @@ -189,7 +131,7 @@ public TestTopologyExtension createTopologyExtension( public Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() .bootstrapServers("localhost:9092") - .schemaRegistryUrl(this.schemaRegistryUrl) + .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) .build(); return app.getKafkaProperties(endpointConfig); } From 4aebddeaf183a9e7ff7db170c6f8c8d394f7e248 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 13:27:50 +0100 Subject: [PATCH 06/15] Update --- .../com/bakdata/kafka/TestApplicationHelper.java | 6 ++++-- .../bakdata/kafka/integration/RunStreamsAppTest.java | 3 +-- .../kafka/integration/StreamsCleanUpTest.java | 3 +-- .../bakdata/kafka/integration/StreamsRunnerTest.java | 12 +----------- .../kafka/CapturingUncaughtExceptionHandler.java | 0 5 files changed, 7 insertions(+), 17 deletions(-) rename {streams-bootstrap-cli-test => streams-bootstrap-test}/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java (100%) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java index 927f9c68..e4bf64db 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -34,6 +34,7 @@ @RequiredArgsConstructor public final class TestApplicationHelper { + private final @NonNull String bootstrapServers; @Getter private final @NonNull SchemaRegistryEnv schemaRegistryEnv; @@ -77,14 +78,15 @@ public TestTopology createTopologyExtension(final KafkaStreamsAppli return testTopologyFactory.createTopologyExtension(configuredApp); } - public KafkaTestClient newTestClient(final String bootstrapServers) { + public KafkaTestClient newTestClient() { return new KafkaTestClient(KafkaEndpointConfig.builder() - .bootstrapServers(bootstrapServers) + .bootstrapServers(this.bootstrapServers) .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) .build()); } public void configure(final KafkaStreamsApplication app) { + app.setBootstrapServers(this.bootstrapServers); app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index b0601a37..49e508c3 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -55,11 +55,10 @@ void shouldRunApp() { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(output); try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { - app.setBootstrapServers(this.getBootstrapServers()); app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app); + new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 00e10b12..7921933c 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -158,7 +158,7 @@ private void runAppAndClose(final KafkaStreamsApplication app) { } private void runApp(final KafkaStreamsApplication app) { - new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app); + new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } @@ -198,7 +198,6 @@ private KafkaStreamsApplication createWordCountApplication() { } private > T configure(final T application) { - application.setBootstrapServers(this.getBootstrapServers()); application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); return application; } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 061b6c5c..773edeb0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import com.bakdata.kafka.AppConfiguration; +import com.bakdata.kafka.CapturingUncaughtExceptionHandler; import com.bakdata.kafka.ConfiguredStreamsApp; import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; @@ -48,7 +49,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import lombok.Getter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes.StringSerde; @@ -229,16 +229,6 @@ private ConfiguredStreamsApp createErrorApplication() { .build()); } - @Getter - private static class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler { - private Throwable lastException; - - @Override - public void uncaughtException(final Thread t, final Throwable e) { - this.lastException = e; - } - } - private static class ErrorApplication implements StreamsApp { @Override diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java similarity index 100% rename from streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java rename to streams-bootstrap-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java From 1d970bc7a849ae921708915a83ca576ca2777dfa Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 13:46:14 +0100 Subject: [PATCH 07/15] Update --- .../bakdata/kafka/TestApplicationHelper.java | 34 +-------- .../bakdata/kafka/TestApplicationRunner.java | 73 +++++++++++++++++++ .../kafka/integration/RunStreamsAppTest.java | 4 +- .../kafka/integration/StreamsCleanUpTest.java | 9 ++- 4 files changed, 83 insertions(+), 37 deletions(-) create mode 100644 streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java index e4bf64db..2091b80c 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -25,40 +25,16 @@ package com.bakdata.kafka; import com.bakdata.fluent_kafka_streams_tests.TestTopology; -import java.lang.Thread.UncaughtExceptionHandler; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import picocli.CommandLine; +@Getter @RequiredArgsConstructor public final class TestApplicationHelper { - private final @NonNull String bootstrapServers; - @Getter private final @NonNull SchemaRegistryEnv schemaRegistryEnv; - public Thread runApplication(final KafkaStreamsApplication app) { - this.configure(app); - new CommandLine(app); // initialize all mixins - app.onApplicationStart(); - final Thread thread = new Thread(app); - final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); - thread.setUncaughtExceptionHandler(handler); - thread.start(); - return thread; - } - - public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { - this.configure(app); - final KafkaEndpointConfig endpointConfig = app.getEndpointConfig(); - final KafkaTestClient testClient = new KafkaTestClient(endpointConfig); - try (final ConfiguredStreamsApp configuredApp = app.createConfiguredApp()) { - final String uniqueAppId = configuredApp.getUniqueAppId(); - return new ConsumerGroupVerifier(uniqueAppId, testClient::admin); - } - } - public ConfiguredStreamsApp createConfiguredApp( final KafkaStreamsApplication app) { this.configure(app); @@ -78,15 +54,7 @@ public TestTopology createTopologyExtension(final KafkaStreamsAppli return testTopologyFactory.createTopologyExtension(configuredApp); } - public KafkaTestClient newTestClient() { - return new KafkaTestClient(KafkaEndpointConfig.builder() - .bootstrapServers(this.bootstrapServers) - .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) - .build()); - } - public void configure(final KafkaStreamsApplication app) { - app.setBootstrapServers(this.bootstrapServers); app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); } diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java new file mode 100644 index 00000000..b4667a15 --- /dev/null +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -0,0 +1,73 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.lang.Thread.UncaughtExceptionHandler; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import picocli.CommandLine; + +@Getter +@RequiredArgsConstructor +public final class TestApplicationRunner { + + private final @NonNull String bootstrapServers; + private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + + public Thread runApplication(final KafkaStreamsApplication app) { + this.configure(app); + new CommandLine(app); // initialize all mixins + app.onApplicationStart(); + final Thread thread = new Thread(app); + final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); + thread.setUncaughtExceptionHandler(handler); + thread.start(); + return thread; + } + + public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { + this.configure(app); + final KafkaEndpointConfig endpointConfig = app.getEndpointConfig(); + final KafkaTestClient testClient = new KafkaTestClient(endpointConfig); + try (final ConfiguredStreamsApp configuredApp = app.createConfiguredApp()) { + final String uniqueAppId = configuredApp.getUniqueAppId(); + return new ConsumerGroupVerifier(uniqueAppId, testClient::admin); + } + } + + public KafkaTestClient newTestClient() { + return new KafkaTestClient(KafkaEndpointConfig.builder() + .bootstrapServers(this.bootstrapServers) + .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) + .build()); + } + + public void configure(final KafkaStreamsApplication app) { + app.setBootstrapServers(this.bootstrapServers); + app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + } + +} diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 49e508c3..df37e796 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -32,7 +32,7 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; -import com.bakdata.kafka.TestApplicationHelper; +import com.bakdata.kafka.TestApplicationRunner; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.Mirror; import java.nio.file.Path; @@ -58,7 +58,7 @@ void shouldRunApp() { app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); + new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 7921933c..6c88fae7 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -33,7 +33,7 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; -import com.bakdata.kafka.TestApplicationHelper; +import com.bakdata.kafka.TestApplicationRunner; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; @@ -144,6 +144,7 @@ void shouldCallClose() { this.newTestClient().createTopic(app.getInputTopics().get(0)); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); + this.createTestHelper().configure(app); app.clean(); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); @@ -158,11 +159,15 @@ private void runAppAndClose(final KafkaStreamsApplication app) { } private void runApp(final KafkaStreamsApplication app) { - new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); + this.createTestHelper().runApplication(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } + private TestApplicationRunner createTestHelper() { + return new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()); + } + private CloseFlagApp createCloseFlagApplication() { final CloseFlagApp app = new CloseFlagApp(); app.setInputTopics(List.of("input")); From 3433b25278b9acc0383c6ff34ead243ba63cf60b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 13:52:16 +0100 Subject: [PATCH 08/15] Update --- .../bakdata/kafka/TestApplicationRunner.java | 22 +++++++++++++++---- .../kafka/integration/RunStreamsAppTest.java | 2 +- .../kafka/integration/StreamsCleanUpTest.java | 21 ++++++++++++------ 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index b4667a15..1b0595c6 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -37,10 +37,8 @@ public final class TestApplicationRunner { private final @NonNull String bootstrapServers; private final @NonNull SchemaRegistryEnv schemaRegistryEnv; - public Thread runApplication(final KafkaStreamsApplication app) { - this.configure(app); - new CommandLine(app); // initialize all mixins - app.onApplicationStart(); + public Thread run(final KafkaStreamsApplication app) { + this.prepareExecution(app); final Thread thread = new Thread(app); final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); thread.setUncaughtExceptionHandler(handler); @@ -48,6 +46,22 @@ public Thread runApplication(final KafkaStreamsApplication return thread; } + public void clean(final KafkaStreamsApplication app) { + this.prepareExecution(app); + app.clean(); + } + + public void reset(final KafkaStreamsApplication app) { + this.prepareExecution(app); + app.reset(); + } + + public void prepareExecution(final KafkaStreamsApplication app) { + this.configure(app); + new CommandLine(app); // initialize all mixins + app.onApplicationStart(); + } + public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { this.configure(app); final KafkaEndpointConfig endpointConfig = app.getEndpointConfig(); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index df37e796..85188a20 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -58,7 +58,7 @@ void shouldRunApp() { app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app); + new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()).run(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 6c88fae7..3eeae5b7 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -87,7 +87,7 @@ void shouldClean() { // Wait until all stream applications are completely stopped before triggering cleanup this.awaitClosed(app.createExecutableApp()); - app.clean(); + this.clean(app); try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) @@ -123,7 +123,7 @@ void shouldReset() { // Wait until all stream applications are completely stopped before triggering cleanup this.awaitClosed(app.createExecutableApp()); - app.reset(); + this.reset(app); try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) @@ -144,27 +144,34 @@ void shouldCallClose() { this.newTestClient().createTopic(app.getInputTopics().get(0)); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); - this.createTestHelper().configure(app); - app.clean(); + this.clean(app); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); - app.reset(); + this.reset(app); this.softly.assertThat(app.isAppClosed()).isTrue(); } } + private void clean(final KafkaStreamsApplication app) { + this.createTestRunner().clean(app); + } + + private void reset(final KafkaStreamsApplication app) { + this.createTestRunner().reset(app); + } + private void runAppAndClose(final KafkaStreamsApplication app) { this.runApp(app); app.stop(); } private void runApp(final KafkaStreamsApplication app) { - this.createTestHelper().runApplication(app); + this.createTestRunner().run(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } - private TestApplicationRunner createTestHelper() { + private TestApplicationRunner createTestRunner() { return new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()); } From 17ed85bb2b84de78c978d21b99cdba088a765b81 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 15:11:35 +0100 Subject: [PATCH 09/15] Update --- .../bakdata/kafka/TestApplicationRunner.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index 1b0595c6..6c5a3590 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -24,7 +24,10 @@ package com.bakdata.kafka; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -37,15 +40,30 @@ public final class TestApplicationRunner { private final @NonNull String bootstrapServers; private final @NonNull SchemaRegistryEnv schemaRegistryEnv; - public Thread run(final KafkaStreamsApplication app) { - this.prepareExecution(app); - final Thread thread = new Thread(app); + private static Thread start(final Runnable runnable) { + final Thread thread = new Thread(runnable); final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); thread.setUncaughtExceptionHandler(handler); thread.start(); return thread; } + public Thread start(final KafkaStreamsApplication app, final String[] args) { + final Builder argBuilder = ImmutableList.builder() + .add(args) + .add("--bootstrap-servers", this.bootstrapServers); + if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); + } + final List newArgs = argBuilder.build(); + return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs.toArray(new String[0]))); + } + + public Thread run(final KafkaStreamsApplication app) { + this.prepareExecution(app); + return start(app); + } + public void clean(final KafkaStreamsApplication app) { this.prepareExecution(app); app.clean(); From ad7f7ec9d89e4022844980b3fe615d8df48e583d Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 15:16:22 +0100 Subject: [PATCH 10/15] Update --- .../bakdata/kafka/TestApplicationRunner.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index 6c5a3590..55636f8d 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -48,15 +48,19 @@ private static Thread start(final Runnable runnable) { return thread; } - public Thread start(final KafkaStreamsApplication app, final String[] args) { - final Builder argBuilder = ImmutableList.builder() - .add(args) - .add("--bootstrap-servers", this.bootstrapServers); - if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { - argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); - } - final List newArgs = argBuilder.build(); - return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs.toArray(new String[0]))); + public Thread run(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, "run"); + return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); + } + + public int clean(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, "clean"); + return KafkaApplication.startApplicationWithoutExit(app, newArgs); + } + + public int reset(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, "reset"); + return KafkaApplication.startApplicationWithoutExit(app, newArgs); } public Thread run(final KafkaStreamsApplication app) { @@ -102,4 +106,16 @@ public void configure(final KafkaStreamsApplication app) { app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); } + private String[] setupArgs(final String[] args, final String command) { + final Builder argBuilder = ImmutableList.builder() + .add(args) + .add("--bootstrap-servers", this.bootstrapServers); + if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); + } + argBuilder.add(command); + final List newArgs = argBuilder.build(); + return newArgs.toArray(new String[0]); + } + } From 1141109a0144c3fb52cd32f589acfe6a1730ec85 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 15:21:43 +0100 Subject: [PATCH 11/15] Update --- .../main/java/com/bakdata/kafka/TestApplicationRunner.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index 55636f8d..f98bf160 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -49,7 +49,7 @@ private static Thread start(final Runnable runnable) { } public Thread run(final KafkaStreamsApplication app, final String[] args) { - final String[] newArgs = this.setupArgs(args, "run"); + final String[] newArgs = this.setupArgs(args, null); return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); } @@ -113,7 +113,9 @@ private String[] setupArgs(final String[] args, final String command) { if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); } - argBuilder.add(command); + if (command != null) { + argBuilder.add(command); + } final List newArgs = argBuilder.build(); return newArgs.toArray(new String[0]); } From 56c3889b94977ccaa3bbc1739426ca95f5161b26 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 15:34:14 +0100 Subject: [PATCH 12/15] Update --- .../com/bakdata/kafka/TestApplicationRunner.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index f98bf160..72509d4d 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import static java.util.Collections.emptyList; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import java.lang.Thread.UncaughtExceptionHandler; @@ -49,17 +51,17 @@ private static Thread start(final Runnable runnable) { } public Thread run(final KafkaStreamsApplication app, final String[] args) { - final String[] newArgs = this.setupArgs(args, null); + final String[] newArgs = this.setupArgs(args, emptyList()); return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); } public int clean(final KafkaStreamsApplication app, final String[] args) { - final String[] newArgs = this.setupArgs(args, "clean"); + final String[] newArgs = this.setupArgs(args, List.of("clean")); return KafkaApplication.startApplicationWithoutExit(app, newArgs); } public int reset(final KafkaStreamsApplication app, final String[] args) { - final String[] newArgs = this.setupArgs(args, "reset"); + final String[] newArgs = this.setupArgs(args, List.of("reset")); return KafkaApplication.startApplicationWithoutExit(app, newArgs); } @@ -106,16 +108,14 @@ public void configure(final KafkaStreamsApplication app) { app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); } - private String[] setupArgs(final String[] args, final String command) { + private String[] setupArgs(final String[] args, final Iterable command) { final Builder argBuilder = ImmutableList.builder() .add(args) - .add("--bootstrap-servers", this.bootstrapServers); + .add("--bootstrap-servers", this.bootstrapServers) + .addAll(command); if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); } - if (command != null) { - argBuilder.add(command); - } final List newArgs = argBuilder.build(); return newArgs.toArray(new String[0]); } From f16dd1031ed29b0f9ff9111e5ce3129ff6e3b5bd Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 23 Jan 2025 15:44:14 +0100 Subject: [PATCH 13/15] Update --- .../bakdata/kafka/TestApplicationRunner.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index 72509d4d..cd041def 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -42,17 +42,10 @@ public final class TestApplicationRunner { private final @NonNull String bootstrapServers; private final @NonNull SchemaRegistryEnv schemaRegistryEnv; - private static Thread start(final Runnable runnable) { - final Thread thread = new Thread(runnable); - final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); - thread.setUncaughtExceptionHandler(handler); - thread.start(); - return thread; - } - - public Thread run(final KafkaStreamsApplication app, final String[] args) { + public void run(final KafkaStreamsApplication app, final String[] args) { final String[] newArgs = this.setupArgs(args, emptyList()); - return start(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); + final Thread thread = new Thread(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); + thread.start(); } public int clean(final KafkaStreamsApplication app, final String[] args) { @@ -67,7 +60,11 @@ public int reset(final KafkaStreamsApplication app, final public Thread run(final KafkaStreamsApplication app) { this.prepareExecution(app); - return start(app); + final Thread thread = new Thread(app); + final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); + thread.setUncaughtExceptionHandler(handler); + thread.start(); + return thread; } public void clean(final KafkaStreamsApplication app) { From 426cd9201b41fba38c65699aa8dc84861db401b4 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Jan 2025 13:04:38 +0100 Subject: [PATCH 14/15] Add error handling --- .../com/bakdata/kafka/TestApplicationHelper.java | 6 +++--- .../com/bakdata/kafka/TestApplicationRunner.java | 12 +++++------- .../bakdata/kafka/integration/RunStreamsAppTest.java | 2 +- .../kafka/integration/StreamsCleanUpTest.java | 2 +- .../test/java/com/bakdata/kafka/AvroMirrorTest.java | 2 +- .../java/com/bakdata/kafka/KafkaTest.java | 6 +++--- .../{SchemaRegistryEnv.java => TestEnvironment.java} | 10 +++++----- .../java/com/bakdata/kafka/TestTopologyFactory.java | 4 ++-- 8 files changed, 21 insertions(+), 23 deletions(-) rename streams-bootstrap-test/src/main/java/com/bakdata/kafka/{SchemaRegistryEnv.java => TestEnvironment.java} (92%) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java index 2091b80c..53487bcf 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -33,7 +33,7 @@ @RequiredArgsConstructor public final class TestApplicationHelper { - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; public ConfiguredStreamsApp createConfiguredApp( final KafkaStreamsApplication app) { @@ -55,11 +55,11 @@ public TestTopology createTopologyExtension(final KafkaStreamsAppli } public void configure(final KafkaStreamsApplication app) { - app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); } private TestTopologyFactory createTestTopologyFactory() { - return new TestTopologyFactory(this.schemaRegistryEnv); + return new TestTopologyFactory(this.environment); } } diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index cd041def..f5f69f81 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -33,14 +33,13 @@ import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; -import picocli.CommandLine; @Getter @RequiredArgsConstructor public final class TestApplicationRunner { private final @NonNull String bootstrapServers; - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; public void run(final KafkaStreamsApplication app, final String[] args) { final String[] newArgs = this.setupArgs(args, emptyList()); @@ -79,7 +78,6 @@ public void reset(final KafkaStreamsApplication app) { public void prepareExecution(final KafkaStreamsApplication app) { this.configure(app); - new CommandLine(app); // initialize all mixins app.onApplicationStart(); } @@ -96,13 +94,13 @@ public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { app.setBootstrapServers(this.bootstrapServers); - app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()); + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); } private String[] setupArgs(final String[] args, final Iterable command) { @@ -110,8 +108,8 @@ private String[] setupArgs(final String[] args, final Iterable command) .add(args) .add("--bootstrap-servers", this.bootstrapServers) .addAll(command); - if (this.schemaRegistryEnv.getSchemaRegistryUrl() != null) { - argBuilder.add("--schema-registry-url", this.schemaRegistryEnv.getSchemaRegistryUrl()); + if (this.environment.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.environment.getSchemaRegistryUrl()); } final List newArgs = argBuilder.build(); return newArgs.toArray(new String[0]); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 85188a20..221f4355 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,7 +24,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; import static org.assertj.core.api.Assertions.assertThat; import com.bakdata.kafka.KafkaStreamsApplication; diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 3eeae5b7..c8af8d5e 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,7 +25,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.SchemaRegistryEnv.withoutSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; import com.bakdata.kafka.CloseFlagApp; import com.bakdata.kafka.KafkaStreamsApplication; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 3ddad2f2..471a3b31 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -24,7 +24,7 @@ package com.bakdata.kafka; -import static com.bakdata.kafka.SchemaRegistryEnv.withSchemaRegistry; +import static com.bakdata.kafka.TestEnvironment.withSchemaRegistry; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.test_applications.MirrorWithNonDefaultSerde; diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 3e8b094e..3993d57e 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -36,7 +36,7 @@ @Testcontainers public abstract class KafkaTest { protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); - private final SchemaRegistryEnv schemaRegistryEnv = SchemaRegistryEnv.withSchemaRegistry(); + private final TestEnvironment environment = TestEnvironment.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); @@ -73,11 +73,11 @@ protected KafkaTestClient newTestClient() { } protected String getSchemaRegistryUrl() { - return this.schemaRegistryEnv.getSchemaRegistryUrl(); + return this.environment.getSchemaRegistryUrl(); } protected SchemaRegistryClient getSchemaRegistryClient() { - return this.schemaRegistryEnv.getSchemaRegistryClient(); + return this.environment.getSchemaRegistryClient(); } protected void awaitProcessing(final ExecutableStreamsApp app) { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java similarity index 92% rename from streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java rename to streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java index 53438d25..e10110de 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SchemaRegistryEnv.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java @@ -41,7 +41,7 @@ */ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) @Getter -public final class SchemaRegistryEnv { +public final class TestEnvironment { private static final String MOCK_URL_PREFIX = "mock://"; private final String schemaRegistryUrl; @@ -50,7 +50,7 @@ public final class SchemaRegistryEnv { * Create a new {@code SchemaRegistryEnv} with no configured Schema Registry. * @return {@code SchemaRegistryEnv} with no configured Schema Registry */ - public static SchemaRegistryEnv withoutSchemaRegistry() { + public static TestEnvironment withoutSchemaRegistry() { return withSchemaRegistry(null); } @@ -60,7 +60,7 @@ public static SchemaRegistryEnv withoutSchemaRegistry() { * collisions between different test instances as scopes are retained globally. * @return {@code SchemaRegistryEnv} with configured Schema Registry */ - public static SchemaRegistryEnv withSchemaRegistry() { + public static TestEnvironment withSchemaRegistry() { return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); } @@ -69,8 +69,8 @@ public static SchemaRegistryEnv withSchemaRegistry() { * @param schemaRegistryUrl Schema Registry URL to use * @return {@code SchemaRegistryEnv} with configured Schema Registry */ - public static SchemaRegistryEnv withSchemaRegistry(final String schemaRegistryUrl) { - return new SchemaRegistryEnv(schemaRegistryUrl); + public static TestEnvironment withSchemaRegistry(final String schemaRegistryUrl) { + return new TestEnvironment(schemaRegistryUrl); } /** diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 9e4b8339..276b075b 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -47,7 +47,7 @@ public final class TestTopologyFactory { StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000) ); - private final @NonNull SchemaRegistryEnv schemaRegistryEnv; + private final @NonNull TestEnvironment environment; /** * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and @@ -131,7 +131,7 @@ public TestTopologyExtension createTopologyExtension( public Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() .bootstrapServers("localhost:9092") - .schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl()) + .schemaRegistryUrl(this.environment.getSchemaRegistryUrl()) .build(); return app.getKafkaProperties(endpointConfig); } From d745ed8338d6d11fd65f22fa3ad7878935c43d9b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Jan 2025 15:45:31 +0100 Subject: [PATCH 15/15] Update --- .../main/java/com/bakdata/kafka/TestApplicationRunner.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java index f5f69f81..a7894e93 100644 --- a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -106,12 +106,13 @@ public void configure(final KafkaStreamsApplication app) { private String[] setupArgs(final String[] args, final Iterable command) { final Builder argBuilder = ImmutableList.builder() .add(args) - .add("--bootstrap-servers", this.bootstrapServers) - .addAll(command); + .add("--bootstrap-servers", this.bootstrapServers); if (this.environment.getSchemaRegistryUrl() != null) { argBuilder.add("--schema-registry-url", this.environment.getSchemaRegistryUrl()); } - final List newArgs = argBuilder.build(); + final List newArgs = argBuilder + .addAll(command) + .build(); return newArgs.toArray(new String[0]); }