diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java index 4c3a151f..df85de16 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import com.bakdata.kafka.HasTopicHooks.TopicDeletionHook; import java.util.Map; import java.util.function.Consumer; import lombok.experimental.UtilityClass; @@ -41,7 +42,7 @@ public class LargeMessageKafkaApplicationUtils { * @return hook that cleans up LargeMessage files associated with a topic * @see StreamsCleanUpRunner#registerTopicDeletionHook(Consumer) */ - public static Consumer createLargeMessageCleanUpHook(final Map kafkaProperties) { + public static TopicDeletionHook createLargeMessageCleanUpHook(final Map kafkaProperties) { final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties); final LargeMessageStoringClient storer = largeMessageConfig.getStorer(); return storer::deleteAllFiles; @@ -54,22 +55,7 @@ public static Consumer createLargeMessageCleanUpHook(final Map kafkaProperties, - final StreamsCleanUpRunner cleanUpRunner) { - final Consumer deleteAllFiles = createLargeMessageCleanUpHook(kafkaProperties); - cleanUpRunner.registerTopicDeletionHook(deleteAllFiles); - } - - /** - * Register a hook that cleans up LargeMessage files associated with a topic. - * - * @param kafkaProperties Kafka properties to create hook from - * @param cleanUpRunner {@code ProducerCleanUpRunner} to register hook on - * @see #createLargeMessageCleanUpHook(Map) - */ - public static void registerLargeMessageCleanUpHook(final Map kafkaProperties, - final ProducerCleanUpRunner cleanUpRunner) { - final Consumer deleteAllFiles = createLargeMessageCleanUpHook(kafkaProperties); - cleanUpRunner.registerTopicDeletionHook(deleteAllFiles); + public static void registerLargeMessageCleanUpHook(final HasTopicHooks cleanUpRunner) { + cleanUpRunner.registerTopicDeletionHook(LargeMessageKafkaApplicationUtils::createLargeMessageCleanUpHook); } } diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaProducerApplication.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaProducerApplication.java index 2c756b95..ad40d94b 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaProducerApplication.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaProducerApplication.java @@ -30,9 +30,9 @@ public interface LargeMessageKafkaProducerApplication extends ProducerApp { @Override - default void setupCleanUp(final ProducerCleanUpRunner cleanUpRunner) { - LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(cleanUpRunner.getKafkaProperties()); + default void setupCleanUp(final ProducerCleanUpConfigurer cleanUpRunner) { ProducerApp.super.setupCleanUp(cleanUpRunner); + LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner); } } diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaStreamsApplication.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaStreamsApplication.java index adc20f2f..371dc59c 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaStreamsApplication.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaStreamsApplication.java @@ -30,10 +30,9 @@ public interface LargeMessageKafkaStreamsApplication extends StreamsApp { @Override - default void setupCleanUp(final StreamsCleanUpRunner cleanUpRunner) { - LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner.getKafkaProperties(), - cleanUpRunner); + default void setupCleanUp(final StreamsCleanUpConfigurer cleanUpRunner) { StreamsApp.super.setupCleanUp(cleanUpRunner); + LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner); } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index adc8a1cf..00801dd7 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import com.bakdata.kafka.ConfiguredProducerApp.ExecutableProducerApp; import java.util.Map; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -81,17 +82,21 @@ public ProducerTopicConfig createTopicConfig() { .build(); } - protected abstract ProducerApp createApp(); + public abstract ProducerApp createApp(); - private ProducerRunner createRunner() { - final ConfiguredProducerApp app = this.createConfiguredApp(); - final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); - return app.createRunner(endpointConfig); + public ProducerRunner createRunner() { + final ExecutableProducerApp executableApp = this.createExecutableApp(); + return executableApp.createRunner(); + } + + public ProducerCleanUpRunner createCleanUpRunner() { + final ExecutableProducerApp executableApp = this.createExecutableApp(); + return executableApp.createCleanUpRunner(); } - private ProducerCleanUpRunner createCleanUpRunner() { + private ExecutableProducerApp createExecutableApp() { final ConfiguredProducerApp app = this.createConfiguredApp(); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); - return app.createCleanUpRunner(endpointConfig); + return app.withEndpoint(endpointConfig); } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 4ad87191..b8a395cc 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import com.bakdata.kafka.ConfiguredStreamsApp.ExecutableStreamsApp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -74,15 +75,6 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement @ToString.Exclude private List runners = new ArrayList<>(); - public static void main(final String[] args) { - startApplication(new KafkaStreamsApplication() { - @Override - public StreamsApp createApp() { - return null; - } - }, args); - } - /** * Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown, * either because it caught an error or the application has received a shutdown event. @@ -124,16 +116,14 @@ public StreamsExecutionOptions createExecutionOptions() { } public StreamsRunner createRunner() { - final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(); - final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); + final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp(); final StreamsExecutionOptions executionOptions = this.createExecutionOptions(); - return configuredStreamsApp.createRunner(endpointConfig, executionOptions); + return executableStreamsApp.createRunner(executionOptions); } public StreamsCleanUpRunner createCleanUpRunner() { - final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(); - final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); - return configuredStreamsApp.createCleanUpRunner(endpointConfig); + final ExecutableStreamsApp executableApp = this.createExecutableApp(); + return executableApp.createCleanUpRunner(); } public ConfiguredStreamsApp createConfiguredApp() { @@ -165,6 +155,12 @@ public StreamsTopicConfig createTopicConfig() { .build(); } + public ExecutableStreamsApp createExecutableApp() { + final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(); + final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); + return configuredStreamsApp.withEndpoint(endpointConfig); + } + private StreamsOptions createStreamsOptions() { return StreamsOptions.builder() .productive(this.productive) diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java index 2e3fb721..288044c5 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java @@ -33,7 +33,7 @@ public class SimpleKafkaProducerApplication extends KafkaProducerApplication { private final @NonNull Supplier appFactory; @Override - protected ProducerApp createApp() { + public ProducerApp createApp() { return this.appFactory.get(); } } diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java index 7275ddb0..39d6d584 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java @@ -56,7 +56,7 @@ void shouldExitWithSuccessCode() { public StreamsApp createApp() { return new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } @@ -84,7 +84,7 @@ public void run() { void shouldExitWithErrorCodeOnRunError() { KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(() -> new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } @@ -108,7 +108,7 @@ void shouldExitWithErrorCodeOnCleanupError() { public StreamsApp createApp() { return new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } @@ -140,7 +140,7 @@ void shouldExitWithErrorCodeOnMissingBrokerParameter() { public StreamsApp createApp() { return new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } @@ -169,7 +169,7 @@ void shouldExitWithErrorInTopology() throws InterruptedException { try (final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig()); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())) .peek((k, v) -> { throw new RuntimeException(); @@ -202,7 +202,7 @@ void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException { try (final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig()); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())) .to(builder.getTopics().getOutputTopic()); } @@ -242,7 +242,7 @@ void shouldExitWithErrorOnCleanupError() { public StreamsApp createApp() { return new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } @@ -268,7 +268,7 @@ void shouldParseArguments() { public StreamsApp createApp() { return new StreamsApp() { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 57813280..212c7611 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -33,7 +33,7 @@ @NoArgsConstructor public class Mirror implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); input.to(builder.getTopics().getOutputTopic()); } diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index b81c0d87..2bcdc095 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -40,7 +40,7 @@ public class WordCount implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream textLines = builder.streamInput(); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index e011ff1b..f615ee5b 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -87,21 +87,31 @@ public Map getKafkaProperties(final KafkaEndpointConfig endpoint return kafkaConfig; } - public ProducerCleanUpRunner createCleanUpRunner(final KafkaEndpointConfig endpointConfig) { - final Map kafkaProperties = this.getKafkaProperties(endpointConfig); - final ProducerCleanUpRunner cleanUpRunner = - new ProducerCleanUpRunner(this.configuration.getTopics(), kafkaProperties); + public ExecutableProducerApp withEndpoint(final KafkaEndpointConfig endpointConfig) { + return new ExecutableProducerApp(endpointConfig); + } + + @RequiredArgsConstructor + public class ExecutableProducerApp { + private final @NonNull KafkaEndpointConfig endpointConfig; - this.app.setupCleanUp(cleanUpRunner); - return cleanUpRunner; + public ProducerCleanUpRunner createCleanUpRunner() { + final Map kafkaProperties = + ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig); + final ProducerCleanUpConfigurer configurer = new ProducerCleanUpConfigurer(); + ConfiguredProducerApp.this.app.setupCleanUp(configurer); + return ProducerCleanUpRunner.create(ConfiguredProducerApp.this.configuration.getTopics(), kafkaProperties, + configurer); } - public ProducerRunner createRunner(final KafkaEndpointConfig endpointConfig) { - final Map kafkaProperties = this.getKafkaProperties(endpointConfig); + public ProducerRunner createRunner() { + final Map kafkaProperties = + ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig); final ProducerBuilder producerBuilder = ProducerBuilder.builder() - .topics(this.configuration.getTopics()) + .topics(ConfiguredProducerApp.this.configuration.getTopics()) .kafkaProperties(kafkaProperties) .build(); - return new ProducerRunner(() -> this.app.run(producerBuilder)); + return new ProducerRunner(() -> ConfiguredProducerApp.this.app.run(producerBuilder)); + } } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 1817d790..6f577465 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -97,30 +97,8 @@ public Topology createTopology(final Map kafkaProperties) { return this.createTopology(kafkaProperties, false); } - public StreamsCleanUpRunner createCleanUpRunner(final KafkaEndpointConfig endpointConfig) { - final Map kafkaProperties = this.getKafkaProperties(endpointConfig); - final Topology topology = this.createTopology(kafkaProperties, true); - final StreamsCleanUpRunner cleanUpRunner = StreamsCleanUpRunner.create(topology, kafkaProperties); - cleanUpRunner.registerFinishHook(this.app::close); - - this.app.setupCleanUp(cleanUpRunner); - return cleanUpRunner; - } - - public StreamsRunner createRunner(final KafkaEndpointConfig endpointConfig) { - return this.createRunner(endpointConfig, StreamsExecutionOptions.builder().build()); - } - - public StreamsRunner createRunner(final KafkaEndpointConfig endpointConfig, - final StreamsExecutionOptions executionOptions) { - final Map kafkaProperties = this.getKafkaProperties(endpointConfig); - final Topology topology = this.createTopology(kafkaProperties); - return StreamsRunner.builder() - .topology(topology) - .config(new StreamsConfig(kafkaProperties)) - .executionOptions(executionOptions) - .hooks(this.createHooks()) - .build(); + public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { + return new ExecutableStreamsApp(endpointConfig); } /** @@ -134,18 +112,52 @@ private Topology createTopology(final Map kafkaProperties, final final TopologyBuilder topologyBuilder = TopologyBuilder.builder() .topics(this.configuration.getTopics()) .kafkaProperties(kafkaProperties) + .cleanUp(cleanUp) .build(); - this.app.buildTopology(topologyBuilder, cleanUp); + this.app.buildTopology(topologyBuilder); return topologyBuilder.build(); } - private StreamsHooks createHooks() { - return StreamsHooks.builder() - .stateListener(this.app.getStateListener()) - .uncaughtExceptionHandler(this.app.getUncaughtExceptionHandler()) - .onStart(this.app::onStreamsStart) - .onShutdown(this.app::close) - .build(); + @RequiredArgsConstructor + public class ExecutableStreamsApp { + + private final @NonNull KafkaEndpointConfig endpointConfig; + + public StreamsCleanUpRunner createCleanUpRunner() { + final Map kafkaProperties = + ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig); + final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties, true); + final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer(); + ConfiguredStreamsApp.this.app.setupCleanUp(configurer); + configurer.registerFinishHook(ConfiguredStreamsApp.this.app::close); + return StreamsCleanUpRunner.create(topology, kafkaProperties, configurer); + } + + public StreamsRunner createRunner() { + return this.createRunner(StreamsExecutionOptions.builder().build()); + } + + public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) { + final Map kafkaProperties = + ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig); + final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties); + return StreamsRunner.builder() + .topology(topology) + .config(new StreamsConfig(kafkaProperties)) + .executionOptions(executionOptions) + .hooks(this.createHooks()) + .build(); + } + + private StreamsHooks createHooks() { + return StreamsHooks.builder() + .stateListener(ConfiguredStreamsApp.this.app.getStateListener()) + .uncaughtExceptionHandler(ConfiguredStreamsApp.this.app.getUncaughtExceptionHandler()) + .onStart(ConfiguredStreamsApp.this.app::onStreamsStart) + .onShutdown(ConfiguredStreamsApp.this.app::close) + .build(); + } + } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutor.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java similarity index 50% rename from streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutor.java rename to streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java index 77c63727..ea409ccb 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutor.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java @@ -24,38 +24,18 @@ package com.bakdata.kafka; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; +import java.util.Map; -@RequiredArgsConstructor -public class StreamsExecutor { - private final @NonNull ConfiguredStreamsApp app; - private final @NonNull KafkaEndpointConfig endpointConfig; +public interface HasTopicHooks { + T registerTopicDeletionHook(TopicDeletionHookFactory cleanUpAction); - public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) { - return this.app.createRunner(this.endpointConfig, executionOptions); + @FunctionalInterface + interface TopicDeletionHook { + void deleted(String topic); } - /** - * This method resets the offset for all input topics and deletes internal topics, application state, and optionally - * the output and error topic. - */ - public void clean() { - final StreamsCleanUpRunner cleanUpRunner = this.createCleanUpRunner(); - cleanUpRunner.clean(); + @FunctionalInterface + interface TopicDeletionHookFactory { + TopicDeletionHook create(Map kafkaConfig); } - - /** - * This method resets the offset for all input topics and deletes internal topics, application state, and optionally - * the output and error topic. - */ - public void reset() { - final StreamsCleanUpRunner cleanUpRunner = this.createCleanUpRunner(); - cleanUpRunner.reset(); - } - - private StreamsCleanUpRunner createCleanUpRunner() { - return this.app.createCleanUpRunner(this.endpointConfig); - } - } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index c81cc884..2f7cb011 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -61,7 +61,7 @@ default Map createKafkaProperties() { return kafkaConfig; } - default void setupCleanUp(final ProducerCleanUpRunner cleanUpRunner) { + default void setupCleanUp(final ProducerCleanUpConfigurer cleanUpRunner) { // do nothing by default } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfigurer.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfigurer.java new file mode 100644 index 00000000..ddfb6694 --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfigurer.java @@ -0,0 +1,80 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.NonNull; + +public class ProducerCleanUpConfigurer implements HasTopicHooks { + private final @NonNull Collection topicDeletionHooks = new ArrayList<>(); + private final @NonNull Collection> cleanHooks = new ArrayList<>(); + + /** + * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. + * + * @param cleanUpAction Action to run when a topic requires clean up. Topic is passed as parameter + * @return this for chaining + */ + @Override + public ProducerCleanUpConfigurer registerTopicDeletionHook(final TopicDeletionHookFactory cleanUpAction) { + this.topicDeletionHooks.add(cleanUpAction); + return this; + } + + public ProducerCleanUpConfigurer registerCleanHook(final Consumer action) { + this.cleanHooks.add(action); + return this; + } + + ProducerCleanUpHooks create(final Map kafkaConfig) { + return ProducerCleanUpHooks.builder() + .topicDeletionHooks(this.topicDeletionHooks.stream() + .map(t -> t.create(kafkaConfig)) + .collect(Collectors.toList())) + .cleanHooks(this.cleanHooks) + .build(); + } + + @Builder(access = AccessLevel.PRIVATE) + static class ProducerCleanUpHooks { + private final @NonNull Collection topicDeletionHooks; + private final @NonNull Collection> cleanHooks; + + public void runCleanHooks(final ImprovedAdminClient adminClient) { + this.cleanHooks.forEach(hook -> hook.accept(adminClient)); + } + + public void runTopicDeletionHooks(final String topic) { + this.topicDeletionHooks.forEach(hook -> hook.deleted(topic)); + } + } +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index 27ed06fb..9f30f65a 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -24,12 +24,10 @@ package com.bakdata.kafka; +import com.bakdata.kafka.ProducerCleanUpConfigurer.ProducerCleanUpHooks; import com.bakdata.kafka.util.ImprovedAdminClient; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Map; -import java.util.function.Consumer; +import lombok.AccessLevel; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -40,13 +38,17 @@ * Clean up the state and artifacts of your Kafka Streams app */ @Slf4j -@RequiredArgsConstructor +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class ProducerCleanUpRunner { public static final int RESET_SLEEP_MS = 5000; private final @NonNull ProducerTopicConfig topics; private final @NonNull Map kafkaProperties; - private final @NonNull Collection> topicDeletionHooks = new ArrayList<>(); - private final @NonNull Collection> cleanHooks = new ArrayList<>(); + private final @NonNull ProducerCleanUpHooks cleanHooks; + + public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics, + @NonNull final Map kafkaProperties, @NonNull final ProducerCleanUpConfigurer cleanHooks) { + return new ProducerCleanUpRunner(topics, kafkaProperties, cleanHooks.create(kafkaProperties)); + } static void waitForCleanUp() { try { @@ -57,26 +59,6 @@ static void waitForCleanUp() { } } - public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.kafkaProperties); - } - - /** - * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. - * - * @param cleanUpAction Action to run when a topic requires clean up. Topic is passed as parameter - * @return this for chaining - */ - public ProducerCleanUpRunner registerTopicDeletionHook(final Consumer cleanUpAction) { - this.topicDeletionHooks.add(cleanUpAction); - return this; - } - - public ProducerCleanUpRunner registerCleanHook(final Consumer action) { - this.cleanHooks.add(action); - return this; - } - /** * Clean up your producer app by deleting the output topics. */ @@ -99,7 +81,7 @@ private class Task { private void clean() { this.deleteTopics(); - ProducerCleanUpRunner.this.cleanHooks.forEach(this::run); + ProducerCleanUpRunner.this.cleanHooks.runCleanHooks(this.adminClient); } private void deleteTopics() { @@ -107,18 +89,10 @@ private void deleteTopics() { outputTopics.forEach(this::deleteTopic); } - private void run(final Consumer hook) { - hook.accept(this.adminClient); - } - - private void runTopicDeletionHooks(final String topic) { - ProducerCleanUpRunner.this.topicDeletionHooks.forEach(hook -> hook.accept(topic)); - } - private void deleteTopic(final String topic) { this.adminClient.getSchemaTopicClient() .deleteTopicAndResetSchemaRegistry(topic); - this.runTopicDeletionHooks(topic); + ProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } private Iterable getAllOutputTopics() { diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index f1bfae97..51dde21f 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -70,7 +70,7 @@ default StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { * * @param builder builder to use for building the topology */ - void buildTopology(TopologyBuilder builder, boolean cleanUp); + void buildTopology(TopologyBuilder builder); /** * This must be set to a unique value for every application interacting with your kafka cluster to ensure internal @@ -113,7 +113,7 @@ default Map createKafkaProperties(final StreamsOptions options) return kafkaConfig; } - default void setupCleanUp(final StreamsCleanUpRunner cleanUpRunner) { + default void setupCleanUp(final StreamsCleanUpConfigurer cleanUpRunner) { // do nothing by default } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfigurer.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfigurer.java new file mode 100644 index 00000000..3ec95973 --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfigurer.java @@ -0,0 +1,102 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.NonNull; + +public class StreamsCleanUpConfigurer implements HasTopicHooks { + private final @NonNull Collection topicDeletionHooks = new ArrayList<>(); + private final @NonNull Collection> cleanHooks = new ArrayList<>(); + private final @NonNull Collection> resetHooks = new ArrayList<>(); + private final @NonNull Collection finishHooks = new ArrayList<>(); + + /** + * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. + * + * @param cleanUpAction Action to run when a topic requires clean up. Topic is passed as parameter + * @return this for chaining + */ + @Override + public StreamsCleanUpConfigurer registerTopicDeletionHook(final TopicDeletionHookFactory cleanUpAction) { + this.topicDeletionHooks.add(cleanUpAction); + return this; + } + + public StreamsCleanUpConfigurer registerCleanHook(final Consumer action) { + this.cleanHooks.add(action); + return this; + } + + public StreamsCleanUpConfigurer registerResetHook(final Consumer action) { + this.resetHooks.add(action); + return this; + } + + public StreamsCleanUpConfigurer registerFinishHook(final Runnable action) { + this.finishHooks.add(action); + return this; + } + + StreamsCleanUpHooks create(final Map kafkaConfig) { + return StreamsCleanUpHooks.builder() + .topicDeletionHooks(this.topicDeletionHooks.stream() + .map(t -> t.create(kafkaConfig)) + .collect(Collectors.toList())) + .cleanHooks(this.cleanHooks) + .build(); + } + + @Builder(access = AccessLevel.PRIVATE) + static class StreamsCleanUpHooks { + private final @NonNull Collection topicDeletionHooks; + private final @NonNull Collection> cleanHooks; + private final @NonNull Collection> resetHooks; + private final @NonNull Collection finishHooks; + + public void runCleanHooks(final ImprovedAdminClient adminClient) { + this.cleanHooks.forEach(hook -> hook.accept(adminClient)); + } + + public void runResetHooks(final ImprovedAdminClient adminClient) { + this.resetHooks.forEach(hook -> hook.accept(adminClient)); + } + + public void runFinishHooks() { + this.finishHooks.forEach(Runnable::run); + } + + public void runTopicDeletionHooks(final String topic) { + this.topicDeletionHooks.forEach(hook -> hook.deleted(topic)); + } + } +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 9c7cec02..b30fffe0 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -26,6 +26,7 @@ import static com.bakdata.kafka.ProducerCleanUpRunner.waitForCleanUp; +import com.bakdata.kafka.StreamsCleanUpConfigurer.StreamsCleanUpHooks; import com.bakdata.kafka.util.ConsumerGroupClient; import com.bakdata.kafka.util.ImprovedAdminClient; import com.bakdata.kafka.util.TopologyInformation; @@ -34,12 +35,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.function.Consumer; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NonNull; @@ -61,16 +60,14 @@ public final class StreamsCleanUpRunner { private final TopologyInformation topologyInformation; private final Topology topology; private final @NonNull StreamsAppConfig appConfig; - private final @NonNull Collection> topicDeletionHooks = new ArrayList<>(); - private final @NonNull Collection> cleanHooks = new ArrayList<>(); - private final @NonNull Collection> resetHooks = new ArrayList<>(); - private final @NonNull Collection finishHooks = new ArrayList<>(); + private final @NonNull StreamsCleanUpHooks cleanHooks; public static StreamsCleanUpRunner create(final @NonNull Topology topology, - final @NonNull Map kafkaProperties) { + final @NonNull Map kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) { final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(kafkaProperties); final TopologyInformation topologyInformation = new TopologyInformation(topology, streamsAppConfig.getAppId()); - return new StreamsCleanUpRunner(topologyInformation, topology, streamsAppConfig); + return new StreamsCleanUpRunner(topologyInformation, topology, streamsAppConfig, + cleanHooks.create(kafkaProperties)); } /** @@ -149,32 +146,6 @@ private static Collection filterExistingTopics(final Collection .collect(Collectors.toList()); } - /** - * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. - * - * @param action Action to run when a topic requires clean up. Topic is passed as parameter - * @return this for chaining - */ - public StreamsCleanUpRunner registerTopicDeletionHook(final Consumer action) { - this.topicDeletionHooks.add(action); - return this; - } - - public StreamsCleanUpRunner registerCleanHook(final Consumer action) { - this.cleanHooks.add(action); - return this; - } - - public StreamsCleanUpRunner registerResetHook(final Consumer action) { - this.resetHooks.add(action); - return this; - } - - public StreamsCleanUpRunner registerFinishHook(final Runnable action) { - this.finishHooks.add(action); - return this; - } - /** * Clean up your Streams app by resetting the app, deleting local state and deleting the output topics * and consumer group. @@ -183,7 +154,7 @@ public void clean() { try (final ImprovedAdminClient adminClient = this.createAdminClient()) { final Task task = new Task(adminClient); task.cleanAndReset(); - this.finish(); + this.cleanHooks.runFinishHooks(); waitForCleanUp(); } } @@ -195,15 +166,11 @@ public void reset() { try (final ImprovedAdminClient adminClient = this.createAdminClient()) { final Task task = new Task(adminClient); task.reset(); - this.finish(); + this.cleanHooks.runFinishHooks(); waitForCleanUp(); } } - private void finish() { - this.finishHooks.forEach(Runnable::run); - } - private ImprovedAdminClient createAdminClient() { return ImprovedAdminClient.create(this.getKafkaProperties()); } @@ -226,7 +193,7 @@ private void reset() { try (final KafkaStreams kafkaStreams = this.createStreams()) { kafkaStreams.cleanUp(); } - StreamsCleanUpRunner.this.resetHooks.forEach(this::run); + StreamsCleanUpRunner.this.cleanHooks.runResetHooks(this.adminClient); } private KafkaStreams createStreams() { @@ -242,7 +209,7 @@ private void cleanAndReset() { private void clean() { this.deleteTopics(); this.deleteConsumerGroup(); - StreamsCleanUpRunner.this.cleanHooks.forEach(this::run); + StreamsCleanUpRunner.this.cleanHooks.runCleanHooks(this.adminClient); } /** @@ -253,24 +220,16 @@ private void deleteTopics() { externalTopics.forEach(this::deleteTopic); } - private void run(final Consumer hook) { - hook.accept(this.adminClient); - } - private void resetInternalTopic(final String topic) { this.adminClient.getSchemaTopicClient() .resetSchemaRegistry(topic); - this.runTopicDeletionHooks(topic); - } - - private void runTopicDeletionHooks(final String topic) { - StreamsCleanUpRunner.this.topicDeletionHooks.forEach(hook -> hook.accept(topic)); + StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } private void deleteTopic(final String topic) { this.adminClient.getSchemaTopicClient() .deleteTopicAndResetSchemaRegistry(topic); - this.runTopicDeletionHooks(topic); + StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic); } private void deleteConsumerGroup() { diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java index 23aaef4d..bd2baa42 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Duration; import lombok.Builder; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.KafkaStreams.CloseOptions; @@ -33,6 +34,8 @@ public class StreamsExecutionOptions { @Builder.Default private final boolean volatileGroupInstanceId = true; + @Builder.Default + private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE); private static boolean isStaticMembershipDisabled(final StreamsConfig config) { return config.originals().get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null; @@ -41,6 +44,6 @@ private static boolean isStaticMembershipDisabled(final StreamsConfig config) { public CloseOptions createCloseOptions(final StreamsConfig config) { final boolean staticMembershipDisabled = isStaticMembershipDisabled(config); final boolean leaveGroup = staticMembershipDisabled || this.volatileGroupInstanceId; - return new CloseOptions().leaveGroup(leaveGroup); + return new CloseOptions().leaveGroup(leaveGroup).timeout(this.closeTimeout); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java index 62b7ff7c..8910bcb1 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -42,6 +42,7 @@ public class TopologyBuilder { StreamsBuilder streamsBuilder = new StreamsBuilder(); @NonNull StreamsTopicConfig topics; @NonNull Map kafkaProperties; + boolean cleanUp; public KStream streamInput(final Consumed consumed) { return this.streamsBuilder.stream(this.topics.getInputTopics(), consumed); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index dbea6e49..4b616d67 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -82,7 +82,7 @@ void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() { private static class TestApplication implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java index 717dc31a..45897d51 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ComplexTopologyApplication.java @@ -48,7 +48,7 @@ public class ComplexTopologyApplication implements StreamsApp { public static final String THROUGH_TOPIC = "through-topic"; @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); input.to(THROUGH_TOPIC); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java index 584cb43a..88c7d0e3 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/ExtraInputTopics.java @@ -33,7 +33,7 @@ @NoArgsConstructor public class ExtraInputTopics implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput("role"); input.to(builder.getTopics().getOutputTopic()); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/Mirror.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/Mirror.java index 57813280..212c7611 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/Mirror.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/Mirror.java @@ -33,7 +33,7 @@ @NoArgsConstructor public class Mirror implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); input.to(builder.getTopics().getOutputTopic()); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java index f2421882..33aa61ef 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorKeyWithAvro.java @@ -39,7 +39,7 @@ @NoArgsConstructor public class MirrorKeyWithAvro implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); input.to(builder.getTopics().getOutputTopic()); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java index bdf085e1..5ecf5bae 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorValueWithAvro.java @@ -39,7 +39,7 @@ @NoArgsConstructor public class MirrorValueWithAvro implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream input = builder.streamInput(); input.to(builder.getTopics().getOutputTopic()); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java index 78bd9041..8daed44f 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/MirrorWithNonDefaultSerde.java @@ -42,7 +42,7 @@ @NoArgsConstructor public class MirrorWithNonDefaultSerde implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final Serde valueSerde = this.getValueSerde(builder.getKafkaProperties()); final KStream input = builder.streamInput(Consumed.with(null, valueSerde)); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCount.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCount.java index b81c0d87..2bcdc095 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCount.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCount.java @@ -40,7 +40,7 @@ public class WordCount implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream textLines = builder.streamInput(); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java index 0f6cfc90..9c86b0b8 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/test_applications/WordCountPattern.java @@ -41,7 +41,7 @@ public class WordCountPattern implements StreamsApp { @Override - public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { + public void buildTopology(final TopologyBuilder builder) { final KStream textLines = builder.streamInputPattern(); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);