diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java index 1d7d6fd4..891c2172 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageKafkaApplicationUtils.java @@ -39,7 +39,7 @@ public class LargeMessageKafkaApplicationUtils { * * @param kafkaProperties Kafka properties to create hook from * @return hook that cleans up LargeMessage files associated with a topic - * @see HasTopicHooks#registerTopicHook(HookFactory) + * @see HasTopicHooks#registerTopicHook(TopicHook) */ public static TopicHook createLargeMessageCleanUpHook(final Map kafkaProperties) { final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties); @@ -52,15 +52,4 @@ public void deleted(final String topic) { }; } - /** - * Register a hook that cleans up LargeMessage files associated with a topic. - * - * @param cleanUpRunner {@code CleanUpRunner} to register hook on - * @return self for chaining - * @see #createLargeMessageCleanUpHook(Map) - */ - public static T registerLargeMessageCleanUpHook(final HasTopicHooks cleanUpRunner) { - return cleanUpRunner.registerTopicHook( - LargeMessageKafkaApplicationUtils::createLargeMessageCleanUpHook); - } } diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java index 9ceafbc0..bba7b301 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java @@ -30,9 +30,10 @@ public interface LargeMessageProducerApp extends ProducerApp { @Override - default ProducerCleanUpConfiguration setupCleanUp() { - final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(); - return LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(configurer); + default ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) { + final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration); + return configurer.registerTopicHook( + LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties())); } } diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java index 24109a69..edeb19a1 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java @@ -30,9 +30,10 @@ public interface LargeMessageStreamsApp extends StreamsApp { @Override - default StreamsCleanUpConfiguration setupCleanUp() { - final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(); - return LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(configurer); + default StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration configuration) { + final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration); + return configurer.registerTopicHook( + LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties())); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 5d51fb8a..82d91d02 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -146,11 +146,16 @@ public StreamsTopicConfig getTopics() { public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { final Map kafkaProperties = this.getKafkaProperties(endpointConfig); final Topology topology = this.createTopology(kafkaProperties); + final StreamsSetupConfiguration setupConfiguration = StreamsSetupConfiguration.builder() + .kafkaProperties(kafkaProperties) + .topics(this.getTopics()) + .build(); return ExecutableStreamsApp.builder() .topology(topology) .config(new StreamsConfig(kafkaProperties)) .app(this.app) - .setup(() -> this.setupApp(kafkaProperties)) + .setup(() -> this.app.setup(setupConfiguration)) + .setupCleanup(() -> this.app.setupCleanUp(setupConfiguration)) .build(); } @@ -174,12 +179,4 @@ public void close() { this.app.close(); } - private void setupApp(final Map kafkaProperties) { - final StreamsSetupConfiguration setupConfiguration = StreamsSetupConfiguration.builder() - .kafkaProperties(kafkaProperties) - .topics(this.getTopics()) - .build(); - this.app.setup(setupConfiguration); - } - } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java index eba32a73..31425b2c 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java @@ -48,7 +48,8 @@ public class ExecutableProducerApp */ @Override public ProducerCleanUpRunner createCleanUpRunner() { - final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp(); + final ProducerSetupConfiguration configuration = this.createSetupConfiguration(); + final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp(configuration); return ProducerCleanUpRunner.create(this.topics, this.kafkaProperties, configurer); } @@ -67,7 +68,8 @@ public ProducerRunner createRunner(final ProducerExecutionOptions options) { .topics(this.topics) .kafkaProperties(this.kafkaProperties) .build(); - this.setup(); + final ProducerSetupConfiguration configuration = this.createSetupConfiguration(); + this.app.setup(configuration); return new ProducerRunner(this.app.buildRunnable(producerBuilder)); } @@ -76,11 +78,14 @@ public void close() { this.app.close(); } - private void setup() { - final ProducerSetupConfiguration configuration = ProducerSetupConfiguration.builder() + private ProducerSetupConfiguration createSetupConfiguration() { + return ProducerSetupConfiguration.builder() .topics(this.topics) .kafkaProperties(this.kafkaProperties) .build(); + } + + private void setup(final ProducerSetupConfiguration configuration) { this.app.setup(configuration); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java index 94593a5a..e0d90fcd 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.util.function.Supplier; import lombok.AccessLevel; import lombok.Builder; import lombok.Getter; @@ -46,8 +47,8 @@ public class ExecutableStreamsApp private final @NonNull StreamsConfig config; @Getter private final @NonNull T app; - @Builder.Default - private final @NonNull Runnable setup = () -> {}; + private final @NonNull Runnable setup; + private final @NonNull Supplier setupCleanup; /** * Create {@code StreamsCleanUpRunner} in order to clean application @@ -55,7 +56,7 @@ public class ExecutableStreamsApp */ @Override public StreamsCleanUpRunner createCleanUpRunner() { - final StreamsCleanUpConfiguration configurer = this.app.setupCleanUp(); + final StreamsCleanUpConfiguration configurer = this.setupCleanup.get(); return StreamsCleanUpRunner.create(this.topology, this.config, configurer); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/HasCleanHook.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasCleanHook.java index 0cec9a85..d6238982 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/HasCleanHook.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasCleanHook.java @@ -32,9 +32,9 @@ public interface HasCleanHook { /** * Register a hook that is invoked when cleaning apps - * @param hookFactory factory to create hook from + * @param hook factory to create hook from * @return self for chaining */ - SELF registerCleanHook(HookFactory hookFactory); + SELF registerCleanHook(Runnable hook); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java index cfb95c3d..f3433e16 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/HasTopicHooks.java @@ -32,10 +32,11 @@ public interface HasTopicHooks { /** * Register a hook that is invoked when performing actions on topics - * @param hookFactory factory to create {@link TopicHook} from + * + * @param hook Action to run. Topic is passed as parameter * @return self for chaining */ - SELF registerTopicHook(HookFactory hookFactory); + SELF registerTopicHook(TopicHook hook); /** * Hook for performing actions on topics diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/HookFactory.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/HookFactory.java deleted file mode 100644 index c54f60fc..00000000 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/HookFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import java.util.Map; - -/** - * Factory to create a hook from Kafka config - */ -@FunctionalInterface -public interface HookFactory { - /** - * Create a new hook - * @param kafkaConfig Kafka configuration for creating hook - * @return hook - */ - T create(Map kafkaConfig); -} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java index acd84376..4c6aeb1f 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerApp.java @@ -59,10 +59,11 @@ default Map createKafkaProperties() { /** * Configure clean up behavior + * @param configuration provides all runtime application configurations * @return {@code ProducerCleanUpConfiguration} * @see ProducerCleanUpRunner */ - default ProducerCleanUpConfiguration setupCleanUp() { + default ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) { return new ProducerCleanUpConfiguration(); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java index edf80d75..aa563091 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpConfiguration.java @@ -26,10 +26,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.AccessLevel; -import lombok.Builder; import lombok.NonNull; /** @@ -37,55 +33,32 @@ */ public class ProducerCleanUpConfiguration implements HasTopicHooks, HasCleanHook { - private final @NonNull Collection> topicDeletionHooks = new ArrayList<>(); - private final @NonNull Collection> cleanHooks = new ArrayList<>(); + private final @NonNull Collection topicHooks = new ArrayList<>(); + private final @NonNull Collection cleanHooks = new ArrayList<>(); /** * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. - * - * @param hookFactory Action to run. Topic is passed as parameter - * @return this for chaining - * @see ProducerCleanUpRunner */ @Override - public ProducerCleanUpConfiguration registerTopicHook(final HookFactory hookFactory) { - this.topicDeletionHooks.add(hookFactory); + public ProducerCleanUpConfiguration registerTopicHook(final TopicHook hook) { + this.topicHooks.add(hook); return this; } /** * Register an action that is executed after {@link ProducerCleanUpRunner#clean()} has finished - * @param action Action to run - * @return this for chaining */ @Override - public ProducerCleanUpConfiguration registerCleanHook(final HookFactory action) { - this.cleanHooks.add(action); + public ProducerCleanUpConfiguration registerCleanHook(final Runnable hook) { + this.cleanHooks.add(hook); return this; } - ProducerCleanUpHooks create(final Map kafkaConfig) { - return ProducerCleanUpHooks.builder() - .topicHooks(this.topicDeletionHooks.stream() - .map(t -> t.create(kafkaConfig)) - .collect(Collectors.toList())) - .cleanHooks(this.cleanHooks.stream() - .map(c -> c.create(kafkaConfig)) - .collect(Collectors.toList())) - .build(); + void runCleanHooks() { + this.cleanHooks.forEach(Runnable::run); } - @Builder(access = AccessLevel.PRIVATE) - static class ProducerCleanUpHooks { - private final @NonNull Collection topicHooks; - private final @NonNull Collection cleanHooks; - - void runCleanHooks() { - this.cleanHooks.forEach(Runnable::run); - } - - void runTopicDeletionHooks(final String topic) { - this.topicHooks.forEach(hook -> hook.deleted(topic)); - } + void runTopicDeletionHooks(final String topic) { + this.topicHooks.forEach(hook -> hook.deleted(topic)); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index f14d5730..bfddf6d1 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.ProducerCleanUpConfiguration.ProducerCleanUpHooks; import com.bakdata.kafka.util.ImprovedAdminClient; import java.util.Map; import lombok.AccessLevel; @@ -42,7 +41,7 @@ public final class ProducerCleanUpRunner implements CleanUpRunner { private final @NonNull ProducerTopicConfig topics; private final @NonNull Map kafkaProperties; - private final @NonNull ProducerCleanUpHooks cleanHooks; + private final @NonNull ProducerCleanUpConfiguration cleanHooks; /** * Create a new {@code ProducerCleanUpRunner} with default {@link ProducerCleanUpConfiguration} @@ -67,7 +66,7 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics, @NonNull final Map kafkaProperties, @NonNull final ProducerCleanUpConfiguration configuration) { - return new ProducerCleanUpRunner(topics, kafkaProperties, configuration.create(kafkaProperties)); + return new ProducerCleanUpRunner(topics, kafkaProperties, configuration); } /** diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerSetupConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerSetupConfiguration.java index 2bc14bb3..8c3c748b 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerSetupConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ProducerSetupConfiguration.java @@ -36,6 +36,7 @@ /** * Configuration for setting up a {@link ProducerApp} * @see ProducerApp#setup(ProducerSetupConfiguration) + * @see ProducerApp#setupCleanUp(ProducerSetupConfiguration) */ @Builder @Value diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java index 83fc3eb2..fb0d263c 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsApp.java @@ -68,10 +68,11 @@ default Map createKafkaProperties() { /** * Configure clean up behavior + * @param configuration provides all runtime application configurations * @return {@code StreamsCleanUpConfiguration} * @see StreamsCleanUpRunner */ - default StreamsCleanUpConfiguration setupCleanUp() { + default StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration configuration) { return new StreamsCleanUpConfiguration(); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java index d3c23494..c9186936 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpConfiguration.java @@ -26,10 +26,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.AccessLevel; -import lombok.Builder; import lombok.NonNull; /** @@ -37,74 +33,47 @@ */ public class StreamsCleanUpConfiguration implements HasTopicHooks, HasCleanHook { - private final @NonNull Collection> topicDeletionHooks = new ArrayList<>(); - private final @NonNull Collection> cleanHooks = new ArrayList<>(); - private final @NonNull Collection> resetHooks = new ArrayList<>(); + private final @NonNull Collection topicHooks = new ArrayList<>(); + private final @NonNull Collection cleanHooks = new ArrayList<>(); + private final @NonNull Collection resetHooks = new ArrayList<>(); /** * Register a hook that is executed whenever a topic has been deleted by the cleanup runner. - * - * @param hookFactory Action to run. Topic is passed as parameter - * @return this for chaining - * @see StreamsCleanUpRunner */ @Override - public StreamsCleanUpConfiguration registerTopicHook(final HookFactory hookFactory) { - this.topicDeletionHooks.add(hookFactory); + public StreamsCleanUpConfiguration registerTopicHook(final TopicHook hook) { + this.topicHooks.add(hook); return this; } /** * Register a hook that is executed after {@link StreamsCleanUpRunner#clean()} has finished - * @param hookFactory Action to run - * @return this for chaining */ @Override - public StreamsCleanUpConfiguration registerCleanHook(final HookFactory hookFactory) { - this.cleanHooks.add(hookFactory); + public StreamsCleanUpConfiguration registerCleanHook(final Runnable hook) { + this.cleanHooks.add(hook); return this; } /** * Register a hook that is executed after {@link StreamsCleanUpRunner#reset()} has finished - * @param hookFactory Action to run - * @return this for chaining + * @param hook factory to create hook from + * @return self for chaining */ - public StreamsCleanUpConfiguration registerResetHook(final HookFactory hookFactory) { - this.resetHooks.add(hookFactory); + public StreamsCleanUpConfiguration registerResetHook(final Runnable hook) { + this.resetHooks.add(hook); return this; } - StreamsCleanUpHooks create(final Map kafkaConfig) { - return StreamsCleanUpHooks.builder() - .topicHooks(this.topicDeletionHooks.stream() - .map(t -> t.create(kafkaConfig)) - .collect(Collectors.toList())) - .cleanHooks(this.cleanHooks.stream() - .map(c -> c.create(kafkaConfig)) - .collect(Collectors.toList())) - .cleanHooks(this.resetHooks.stream() - .map(c -> c.create(kafkaConfig)) - .collect(Collectors.toList())) - .build(); + void runCleanHooks() { + this.cleanHooks.forEach(Runnable::run); } - @Builder(access = AccessLevel.PRIVATE) - static class StreamsCleanUpHooks { - private final @NonNull Collection topicHooks; - private final @NonNull Collection cleanHooks; - private final @NonNull Collection resetHooks; - - void runCleanHooks() { - this.cleanHooks.forEach(Runnable::run); - } - - void runResetHooks() { - this.resetHooks.forEach(Runnable::run); - } + void runResetHooks() { + this.resetHooks.forEach(Runnable::run); + } - void runTopicDeletionHooks(final String topic) { - this.topicHooks.forEach(hook -> hook.deleted(topic)); - } + void runTopicDeletionHooks(final String topic) { + this.topicHooks.forEach(hook -> hook.deleted(topic)); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index c1dec6f6..7246d138 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.StreamsCleanUpConfiguration.StreamsCleanUpHooks; import com.bakdata.kafka.util.ConsumerGroupClient; import com.bakdata.kafka.util.ImprovedAdminClient; import com.bakdata.kafka.util.TopologyInformation; @@ -58,7 +57,7 @@ public final class StreamsCleanUpRunner implements CleanUpRunner { private final TopologyInformation topologyInformation; private final Topology topology; private final @NonNull ImprovedStreamsConfig config; - private final @NonNull StreamsCleanUpHooks cleanHooks; + private final @NonNull StreamsCleanUpConfiguration cleanHooks; /** * Create a new {@code StreamsCleanUpRunner} with default {@link StreamsCleanUpConfiguration} @@ -84,8 +83,7 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology, final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) { final ImprovedStreamsConfig config = new ImprovedStreamsConfig(streamsConfig); final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId()); - final StreamsCleanUpHooks hooks = configuration.create(config.getKafkaProperties()); - return new StreamsCleanUpRunner(topologyInformation, topology, config, hooks); + return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration); } /** diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java index 3f5617bc..18d9821b 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableProducerAppTest.java @@ -114,7 +114,7 @@ public void setup(final ProducerSetupConfiguration configuration) { } @Override - public ProducerCleanUpConfiguration setupCleanUp() { + public ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) { return ExecutableProducerAppTest.this.setupCleanUp.get(); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java index 4c1ec030..486996c8 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ExecutableStreamsAppTest.java @@ -118,7 +118,7 @@ public void setup(final StreamsSetupConfiguration configuration) { } @Override - public StreamsCleanUpConfiguration setupCleanUp() { + public StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration setupConfiguration) { return ExecutableStreamsAppTest.this.setupCleanUp.get(); } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index ca143bea..33e31804 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -36,6 +36,7 @@ import com.bakdata.kafka.HasTopicHooks.TopicHook; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerCleanUpConfiguration; +import com.bakdata.kafka.ProducerSetupConfiguration; import com.bakdata.kafka.ProducerTopicConfig; import com.bakdata.kafka.Runner; import com.bakdata.kafka.test_applications.AvroKeyProducer; @@ -156,9 +157,9 @@ void shouldCallCleanUpHookForAllTopics() { private ConfiguredProducerApp createCleanUpHookApplication() { return configureApp(new StringProducer() { @Override - public ProducerCleanUpConfiguration setupCleanUp() { - return super.setupCleanUp() - .registerTopicHook(p -> ProducerCleanUpRunnerTest.this.topicHook); + public ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) { + return super.setupCleanUp(configuration) + .registerTopicHook(ProducerCleanUpRunnerTest.this.topicHook); } }, ProducerTopicConfig.builder() .outputTopic("output") diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 8ac88a56..da611219 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -40,6 +40,7 @@ import com.bakdata.kafka.StreamsCleanUpConfiguration; import com.bakdata.kafka.StreamsCleanUpRunner; import com.bakdata.kafka.StreamsRunner; +import com.bakdata.kafka.StreamsSetupConfiguration; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.test_applications.ComplexTopologyApplication; @@ -618,9 +619,9 @@ private ConfiguredStreamsApp createComplexCleanUpHookApplication() { this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); return configureApp(new ComplexTopologyApplication() { @Override - public StreamsCleanUpConfiguration setupCleanUp() { - return super.setupCleanUp() - .registerTopicHook(p -> StreamsCleanUpRunnerTest.this.topicHook); + public StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration configuration) { + return super.setupCleanUp(configuration) + .registerTopicHook(StreamsCleanUpRunnerTest.this.topicHook); } }, StreamsTopicConfig.builder() .inputTopics(List.of("input"))