diff --git a/settings.gradle b/settings.gradle index e9057df9..05c49fdb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -11,4 +11,5 @@ include( ":streams-bootstrap-test", ":streams-bootstrap-large-messages", ":streams-bootstrap-cli", + ":streams-bootstrap-cli-test", ) diff --git a/streams-bootstrap-cli-test/build.gradle.kts b/streams-bootstrap-cli-test/build.gradle.kts new file mode 100644 index 00000000..3779fdb0 --- /dev/null +++ b/streams-bootstrap-cli-test/build.gradle.kts @@ -0,0 +1,6 @@ +description = "Utils for testing your Kafka Streams Application" + +dependencies { + api(project(":streams-bootstrap-test")) + api(project(":streams-bootstrap-cli")) +} diff --git a/streams-bootstrap-cli-test/lombok.config b/streams-bootstrap-cli-test/lombok.config new file mode 100644 index 00000000..189c0bef --- /dev/null +++ b/streams-bootstrap-cli-test/lombok.config @@ -0,0 +1,3 @@ +# This file is generated by the 'io.freefair.lombok' Gradle plugin +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java new file mode 100644 index 00000000..53487bcf --- /dev/null +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationHelper.java @@ -0,0 +1,65 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.fluent_kafka_streams_tests.TestTopology; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public final class TestApplicationHelper { + + private final @NonNull TestEnvironment environment; + + public ConfiguredStreamsApp createConfiguredApp( + final KafkaStreamsApplication app) { + this.configure(app); + app.prepareRun(); + return app.createConfiguredApp(); + } + + public TestTopology createTopology(final KafkaStreamsApplication app) { + final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); + final TestTopologyFactory testTopologyFactory = this.createTestTopologyFactory(); + return testTopologyFactory.createTopology(configuredApp); + } + + public TestTopology createTopologyExtension(final KafkaStreamsApplication app) { + final ConfiguredStreamsApp configuredApp = this.createConfiguredApp(app); + final TestTopologyFactory testTopologyFactory = this.createTestTopologyFactory(); + return testTopologyFactory.createTopologyExtension(configuredApp); + } + + public void configure(final KafkaStreamsApplication app) { + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); + } + + private TestTopologyFactory createTestTopologyFactory() { + return new TestTopologyFactory(this.environment); + } + +} diff --git a/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java new file mode 100644 index 00000000..a7894e93 --- /dev/null +++ b/streams-bootstrap-cli-test/src/main/java/com/bakdata/kafka/TestApplicationRunner.java @@ -0,0 +1,119 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyList; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public final class TestApplicationRunner { + + private final @NonNull String bootstrapServers; + private final @NonNull TestEnvironment environment; + + public void run(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, emptyList()); + final Thread thread = new Thread(() -> KafkaApplication.startApplicationWithoutExit(app, newArgs)); + thread.start(); + } + + public int clean(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, List.of("clean")); + return KafkaApplication.startApplicationWithoutExit(app, newArgs); + } + + public int reset(final KafkaStreamsApplication app, final String[] args) { + final String[] newArgs = this.setupArgs(args, List.of("reset")); + return KafkaApplication.startApplicationWithoutExit(app, newArgs); + } + + public Thread run(final KafkaStreamsApplication app) { + this.prepareExecution(app); + final Thread thread = new Thread(app); + final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler(); + thread.setUncaughtExceptionHandler(handler); + thread.start(); + return thread; + } + + public void clean(final KafkaStreamsApplication app) { + this.prepareExecution(app); + app.clean(); + } + + public void reset(final KafkaStreamsApplication app) { + this.prepareExecution(app); + app.reset(); + } + + public void prepareExecution(final KafkaStreamsApplication app) { + this.configure(app); + app.onApplicationStart(); + } + + public ConsumerGroupVerifier verify(final KafkaStreamsApplication app) { + this.configure(app); + final KafkaEndpointConfig endpointConfig = app.getEndpointConfig(); + final KafkaTestClient testClient = new KafkaTestClient(endpointConfig); + try (final ConfiguredStreamsApp configuredApp = app.createConfiguredApp()) { + final String uniqueAppId = configuredApp.getUniqueAppId(); + return new ConsumerGroupVerifier(uniqueAppId, testClient::admin); + } + } + + public KafkaTestClient newTestClient() { + return new KafkaTestClient(KafkaEndpointConfig.builder() + .bootstrapServers(this.bootstrapServers) + .schemaRegistryUrl(this.environment.getSchemaRegistryUrl()) + .build()); + } + + public void configure(final KafkaStreamsApplication app) { + app.setBootstrapServers(this.bootstrapServers); + app.setSchemaRegistryUrl(this.environment.getSchemaRegistryUrl()); + } + + private String[] setupArgs(final String[] args, final Iterable command) { + final Builder argBuilder = ImmutableList.builder() + .add(args) + .add("--bootstrap-servers", this.bootstrapServers); + if (this.environment.getSchemaRegistryUrl() != null) { + argBuilder.add("--schema-registry-url", this.environment.getSchemaRegistryUrl()); + } + final List newArgs = argBuilder + .addAll(command) + .build(); + return newArgs.toArray(new String[0]); + } + +} diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 7cc8ba99..7a3d7dd0 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -18,6 +18,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) testImplementation(testFixtures(project(":streams-bootstrap-core"))) + testImplementation(project(":streams-bootstrap-cli-test")) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") val confluentVersion: String by project testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 15b68e12..aeaded91 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -257,36 +257,36 @@ public final CleanableApp createCleanableApp() { return cleanableApp; } - /** - * Create a new {@code ConfiguredApp} that will be executed according to the given config. - * - * @param app app to configure. - * @param configuration configuration for app - * @return {@code ConfiguredApp} - */ - protected abstract CA createConfiguredApp(final A app, AppConfiguration configuration); - /** * Called before starting the application, e.g., invoking {@link #run()} */ - protected void onApplicationStart() { + public void onApplicationStart() { // do nothing by default } /** * Called before running the application, i.e., invoking {@link #run()} */ - protected void prepareRun() { + public void prepareRun() { // do nothing by default } /** * Called before cleaning the application, i.e., invoking {@link #clean()} */ - protected void prepareClean() { + public void prepareClean() { // do nothing by default } + /** + * Create a new {@code ConfiguredApp} that will be executed according to the given config. + * + * @param app app to configure. + * @param configuration configuration for app + * @return {@code ConfiguredApp} + */ + protected abstract CA createConfiguredApp(final A app, AppConfiguration configuration); + private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 6f561ad5..f965244a 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -154,7 +154,7 @@ public final ConfiguredStreamsApp createConfiguredApp(final T app, * Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()} */ @Override - protected void prepareClean() { + public void prepareClean() { super.prepareClean(); } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index b3a99c2f..221f4355 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; import static org.assertj.core.api.Assertions.assertThat; import com.bakdata.kafka.KafkaStreamsApplication; @@ -31,6 +32,7 @@ import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestApplicationRunner; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.Mirror; import java.nio.file.Path; @@ -53,12 +55,10 @@ void shouldRunApp() { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(output); try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { - app.setBootstrapServers(this.getBootstrapServers()); app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); app.setInputTopics(List.of(input)); app.setOutputTopic(output); - // run in Thread because the application blocks indefinitely - new Thread(app).start(); + new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()).run(app); testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 200af6f0..c8af8d5e 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,12 +25,15 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.TestEnvironment.withoutSchemaRegistry; + import com.bakdata.kafka.CloseFlagApp; import com.bakdata.kafka.KafkaStreamsApplication; import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; +import com.bakdata.kafka.TestApplicationRunner; import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; @@ -84,7 +87,7 @@ void shouldClean() { // Wait until all stream applications are completely stopped before triggering cleanup this.awaitClosed(app.createExecutableApp()); - app.clean(); + this.clean(app); try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) @@ -120,7 +123,7 @@ void shouldReset() { // Wait until all stream applications are completely stopped before triggering cleanup this.awaitClosed(app.createExecutableApp()); - app.reset(); + this.reset(app); try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) @@ -141,26 +144,37 @@ void shouldCallClose() { this.newTestClient().createTopic(app.getInputTopics().get(0)); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); - app.clean(); + this.clean(app); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); - app.reset(); + this.reset(app); this.softly.assertThat(app.isAppClosed()).isTrue(); } } + private void clean(final KafkaStreamsApplication app) { + this.createTestRunner().clean(app); + } + + private void reset(final KafkaStreamsApplication app) { + this.createTestRunner().reset(app); + } + private void runAppAndClose(final KafkaStreamsApplication app) { this.runApp(app); app.stop(); } private void runApp(final KafkaStreamsApplication app) { - // run in Thread because the application blocks indefinitely - new Thread(app).start(); + this.createTestRunner().run(app); // Wait until stream application has consumed all data this.awaitProcessing(app.createExecutableApp()); } + private TestApplicationRunner createTestRunner() { + return new TestApplicationRunner(this.getBootstrapServers(), withoutSchemaRegistry()); + } + private CloseFlagApp createCloseFlagApplication() { final CloseFlagApp app = new CloseFlagApp(); app.setInputTopics(List.of("input")); @@ -196,7 +210,6 @@ private KafkaStreamsApplication createWordCountApplication() { } private > T configure(final T application) { - application.setBootstrapServers(this.getBootstrapServers()); application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir)); return application; } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 553fde3b..471a3b31 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -24,6 +24,8 @@ package com.bakdata.kafka; +import static com.bakdata.kafka.TestEnvironment.withSchemaRegistry; + import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; import com.bakdata.kafka.test_applications.MirrorWithNonDefaultSerde; import java.util.List; @@ -35,7 +37,7 @@ class AvroMirrorTest { private final ConfiguredStreamsApp app = createApp(); @RegisterExtension final TestTopologyExtension testTopology = - TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app); + new TestTopologyFactory(withSchemaRegistry()).createTopologyExtension(this.app); private static ConfiguredStreamsApp createApp() { final AppConfiguration configuration = new AppConfiguration<>(StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 061b6c5c..773edeb0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import com.bakdata.kafka.AppConfiguration; +import com.bakdata.kafka.CapturingUncaughtExceptionHandler; import com.bakdata.kafka.ConfiguredStreamsApp; import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; @@ -48,7 +49,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import lombok.Getter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes.StringSerde; @@ -229,16 +229,6 @@ private ConfiguredStreamsApp createErrorApplication() { .build()); } - @Getter - private static class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler { - private Throwable lastException; - - @Override - public void uncaughtException(final Thread t, final Throwable e) { - this.lastException = e; - } - } - private static class ErrorApplication implements StreamsApp { @Override diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 20a05f59..3993d57e 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -36,7 +36,7 @@ @Testcontainers public abstract class KafkaTest { protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); - private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); + private final TestEnvironment environment = TestEnvironment.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newCluster(); @@ -73,11 +73,11 @@ protected KafkaTestClient newTestClient() { } protected String getSchemaRegistryUrl() { - return this.testTopologyFactory.getSchemaRegistryUrl(); + return this.environment.getSchemaRegistryUrl(); } protected SchemaRegistryClient getSchemaRegistryClient() { - return this.testTopologyFactory.getSchemaRegistryClient(); + return this.environment.getSchemaRegistryClient(); } protected void awaitProcessing(final ExecutableStreamsApp app) { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java new file mode 100644 index 00000000..2b83dc72 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/CapturingUncaughtExceptionHandler.java @@ -0,0 +1,38 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.lang.Thread.UncaughtExceptionHandler; +import lombok.Getter; + +@Getter +public class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler { + private Throwable lastException; + + @Override + public void uncaughtException(final Thread t, final Throwable e) { + this.lastException = e; + } +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java new file mode 100644 index 00000000..e10110de --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestEnvironment.java @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyMap; + +import io.confluent.kafka.schemaregistry.SchemaProvider; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Class that provides helpers for using schema registry in tests. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@Getter +public final class TestEnvironment { + + private static final String MOCK_URL_PREFIX = "mock://"; + private final String schemaRegistryUrl; + + /** + * Create a new {@code SchemaRegistryEnv} with no configured Schema Registry. + * @return {@code SchemaRegistryEnv} with no configured Schema Registry + */ + public static TestEnvironment withoutSchemaRegistry() { + return withSchemaRegistry(null); + } + + /** + * Create a new {@code SchemaRegistryEnv} with configured + * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid + * collisions between different test instances as scopes are retained globally. + * @return {@code SchemaRegistryEnv} with configured Schema Registry + */ + public static TestEnvironment withSchemaRegistry() { + return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); + } + + /** + * Create a new {@code SchemaRegistryEnv} with configured Schema Registry. + * @param schemaRegistryUrl Schema Registry URL to use + * @return {@code SchemaRegistryEnv} with configured Schema Registry + */ + public static TestEnvironment withSchemaRegistry(final String schemaRegistryUrl) { + return new TestEnvironment(schemaRegistryUrl); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL with default providers + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient() { + return this.getSchemaRegistryClient(null); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL + * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient(final List providers) { + final List baseUrls = List.of( + Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured") + ); + return SchemaRegistryClientFactory.newClient(baseUrls, 0, providers, emptyMap(), null); + } +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index 883a027b..276b075b 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -24,20 +24,13 @@ package com.bakdata.kafka; -import static java.util.Collections.emptyMap; - import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; -import io.confluent.kafka.schemaregistry.SchemaProvider; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import java.nio.file.Path; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; @@ -45,43 +38,16 @@ /** * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} */ -@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +@RequiredArgsConstructor +@Getter public final class TestTopologyFactory { - private static final String MOCK_URL_PREFIX = "mock://"; private static final Map STREAMS_TEST_CONFIG = Map.of( // Disable caching to allow immediate aggregations StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, Long.toString(0L), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(10_000) ); - private final String schemaRegistryUrl; - - /** - * Create a new {@code TestTopologyFactory} with no configured Schema Registry. - * @return {@code TestTopologyFactory} with no configured Schema Registry - */ - public static TestTopologyFactory withoutSchemaRegistry() { - return withSchemaRegistry(null); - } - - /** - * Create a new {@code TestTopologyFactory} with configured - * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid - * collisions between different test instances as scopes are retained globally. - * @return {@code TestTopologyFactory} with configured Schema Registry - */ - public static TestTopologyFactory withSchemaRegistry() { - return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); - } - - /** - * Create a new {@code TestTopologyFactory} with configured Schema Registry. - * @param schemaRegistryUrl Schema Registry URL to use - * @return {@code TestTopologyFactory} with configured Schema Registry - */ - public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) { - return new TestTopologyFactory(schemaRegistryUrl); - } + private final @NonNull TestEnvironment environment; /** * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and @@ -122,35 +88,6 @@ public static Map createStreamsTestConfig() { return STREAMS_TEST_CONFIG; } - /** - * Get Schema Registry URL if configured - * @return Schema Registry URL - * @throws NullPointerException if Schema Registry is not configured - */ - public String getSchemaRegistryUrl() { - return Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured"); - } - - /** - * Get {@code SchemaRegistryClient} for configured URL with default providers - * @return {@code SchemaRegistryClient} - * @throws NullPointerException if Schema Registry is not configured - */ - public SchemaRegistryClient getSchemaRegistryClient() { - return this.getSchemaRegistryClient(null); - } - - /** - * Get {@code SchemaRegistryClient} for configured URL - * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} - * @return {@code SchemaRegistryClient} - * @throws NullPointerException if Schema Registry is not configured - */ - public SchemaRegistryClient getSchemaRegistryClient(final List providers) { - return SchemaRegistryClientFactory.newClient(List.of(this.getSchemaRegistryUrl()), 0, providers, emptyMap(), - null); - } - /** * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects a {@link KafkaEndpointConfig} * for test purposes with Schema Registry optionally configured. @@ -194,7 +131,7 @@ public TestTopologyExtension createTopologyExtension( public Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() .bootstrapServers("localhost:9092") - .schemaRegistryUrl(this.schemaRegistryUrl) + .schemaRegistryUrl(this.environment.getSchemaRegistryUrl()) .build(); return app.getKafkaProperties(endpointConfig); }