From fa9a1f1d2ec17271cc83f5b092823f1b77088f3e Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 3 Apr 2024 16:16:28 +0200 Subject: [PATCH] Create v3 --- gradle.properties | 1 + .../kafka/KafkaProducerApplication.java | 13 ++-- .../kafka/KafkaStreamsApplication.java | 19 ++--- .../kafka/SimpleKafkaStreamsApplication.java | 2 +- .../test/java/com/bakdata/kafka/CliTest.java | 10 +-- .../StreamsBootstrapTopologyFactory.java | 12 ++-- .../bakdata/kafka/ConfiguredProducerApp.java | 33 +++------ .../bakdata/kafka/ConfiguredStreamsApp.java | 68 ++++-------------- .../bakdata/kafka/ExecutableProducerApp.java | 52 ++++++++++++++ .../bakdata/kafka/ExecutableStreamsApp.java | 71 +++++++++++++++++++ .../com/bakdata/kafka/StreamsAppConfig.java | 8 +-- .../bakdata/kafka/StreamsCleanUpRunner.java | 4 +- .../com/bakdata/kafka/TopologyBuilder.java | 1 - .../com/bakdata/kafka/AvroMirrorTest.java | 12 ++-- .../java/com/bakdata/kafka/CloseFlagApp.java | 3 +- .../kafka/ConfiguredProducerAppTest.java | 12 ++-- .../kafka/ConfiguredStreamsAppTest.java | 12 ++-- .../kafka/integration/StreamsCleanUpTest.java | 61 ++++++++++------ .../kafka/util/TopologyInformationTest.java | 4 +- 19 files changed, 239 insertions(+), 159 deletions(-) create mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java create mode 100644 streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java diff --git a/gradle.properties b/gradle.properties index 4a260527..8f3cd5fe 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,6 @@ version=2.17.1-SNAPSHOT org.gradle.caching=true +# running Kafka JUnit in parallel causes problems org.gradle.parallel=false kafkaVersion=3.6.1 kafkaJunitVersion=3.6.0 diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index 00801dd7..37659d38 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.ConfiguredProducerApp.ExecutableProducerApp; import java.util.Map; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -60,10 +59,10 @@ public void clean() { cleanUpRunner.clean(); } - public ConfiguredProducerApp createConfiguredApp() { + public ConfiguredProducerApp createConfiguredApp() { final ProducerApp producerApp = this.createApp(); final ProducerAppConfiguration configuration = this.createConfiguration(); - return new ConfiguredProducerApp(producerApp, configuration); + return new ConfiguredProducerApp(producerApp, configuration); } public ProducerAppConfiguration createConfiguration() { @@ -85,17 +84,17 @@ public ProducerTopicConfig createTopicConfig() { public abstract ProducerApp createApp(); public ProducerRunner createRunner() { - final ExecutableProducerApp executableApp = this.createExecutableApp(); + final ExecutableProducerApp executableApp = this.createExecutableApp(); return executableApp.createRunner(); } public ProducerCleanUpRunner createCleanUpRunner() { - final ExecutableProducerApp executableApp = this.createExecutableApp(); + final ExecutableProducerApp executableApp = this.createExecutableApp(); return executableApp.createCleanUpRunner(); } - private ExecutableProducerApp createExecutableApp() { - final ConfiguredProducerApp app = this.createConfiguredApp(); + private ExecutableProducerApp createExecutableApp() { + final ConfiguredProducerApp app = this.createConfiguredApp(); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); return app.withEndpoint(endpointConfig); } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index b17754e1..8614f3af 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import com.bakdata.kafka.ConfiguredStreamsApp.ExecutableStreamsApp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -74,6 +73,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; @ToString.Exclude + // ConcurrentLinkedDeque required because calling #close() causes asynchronous #run() calls to finish and thus + // concurrently iterating on #runners and removing from #runners private ConcurrentLinkedDeque runners = new ConcurrentLinkedDeque<>(); /** @@ -108,7 +109,7 @@ public void reset() { runner.reset(); } - public abstract StreamsApp createApp(); + public abstract StreamsApp createApp(boolean cleanUp); public StreamsExecutionOptions createExecutionOptions() { return StreamsExecutionOptions.builder() @@ -117,20 +118,20 @@ public StreamsExecutionOptions createExecutionOptions() { } public StreamsRunner createRunner() { - final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp(); + final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp(false); final StreamsExecutionOptions executionOptions = this.createExecutionOptions(); return executableStreamsApp.createRunner(executionOptions); } public StreamsCleanUpRunner createCleanUpRunner() { - final ExecutableStreamsApp executableApp = this.createExecutableApp(); + final ExecutableStreamsApp executableApp = this.createExecutableApp(true); return executableApp.createCleanUpRunner(); } - public ConfiguredStreamsApp createConfiguredApp() { - final StreamsApp streamsApp = this.createApp(); + public ConfiguredStreamsApp createConfiguredApp(final boolean cleanUp) { + final StreamsApp streamsApp = this.createApp(cleanUp); final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration(); - return new ConfiguredStreamsApp(streamsApp, streamsAppConfiguration); + return new ConfiguredStreamsApp(streamsApp, streamsAppConfiguration); } public StreamsAppConfiguration createConfiguration() { @@ -156,8 +157,8 @@ public StreamsTopicConfig createTopicConfig() { .build(); } - public ExecutableStreamsApp createExecutableApp() { - final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(); + public ExecutableStreamsApp createExecutableApp(final boolean cleanUp) { + final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(cleanUp); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); return configuredStreamsApp.withEndpoint(endpointConfig); } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java index 5b6646fa..0abb9f39 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java @@ -34,7 +34,7 @@ public class SimpleKafkaStreamsApplication extends KafkaStreamsApplication { private final @NonNull Supplier appFactory; @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return this.appFactory.get(); } } diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java index 39d6d584..305a4b45 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/CliTest.java @@ -53,7 +53,7 @@ private static void runApp(final KafkaStreamsApplication app, final String... ar void shouldExitWithSuccessCode() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -105,7 +105,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { void shouldExitWithErrorCodeOnCleanupError() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -137,7 +137,7 @@ public void clean() { void shouldExitWithErrorCodeOnMissingBrokerParameter() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -239,7 +239,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { void shouldExitWithErrorOnCleanupError() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -265,7 +265,7 @@ public String getUniqueAppId(final StreamsTopicConfig topics) { void shouldParseArguments() { final KafkaStreamsApplication app = new KafkaStreamsApplication() { @Override - public StreamsApp createApp() { + public StreamsApp createApp(final boolean cleanUp) { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java index e6ad8d56..12040beb 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/StreamsBootstrapTopologyFactory.java @@ -49,7 +49,7 @@ public class StreamsBootstrapTopologyFactory { * @see ConfiguredStreamsApp#createTopology(Map) */ public static TestTopology createTopologyWithSchemaRegistry( - final ConfiguredStreamsApp app) { + final ConfiguredStreamsApp app) { return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); } @@ -66,7 +66,7 @@ public static TestTopology createTopologyWithSchemaRegistry( * @see ConfiguredStreamsApp#createTopology(Map) */ public static TestTopologyExtension createTopologyExtensionWithSchemaRegistry( - final ConfiguredStreamsApp app) { + final ConfiguredStreamsApp app) { return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); } @@ -81,7 +81,7 @@ public static TestTopologyExtension createTopologyExtensionWithSche * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) * @see ConfiguredStreamsApp#createTopology(Map) */ - public static TestTopology createTopology(final ConfiguredStreamsApp app) { + public static TestTopology createTopology(final ConfiguredStreamsApp app) { return new TestTopology<>(app::createTopology, getKafkaProperties(app)); } @@ -98,7 +98,7 @@ public static TestTopology createTopology(final ConfiguredStreamsAp * @see ConfiguredStreamsApp#createTopology(Map) */ public static TestTopologyExtension createTopologyExtension( - final ConfiguredStreamsApp app) { + final ConfiguredStreamsApp app) { return new TestTopologyExtension<>(app::createTopology, getKafkaProperties(app)); } @@ -111,7 +111,7 @@ public static TestTopologyExtension createTopologyExtension( * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) */ public static Function> getKafkaPropertiesWithSchemaRegistryUrl( - final ConfiguredStreamsApp app) { + final ConfiguredStreamsApp app) { return schemaRegistryUrl -> { final KafkaEndpointConfig endpointConfig = newEndpointConfig() .schemaRegistryUrl(schemaRegistryUrl) @@ -120,7 +120,7 @@ public static Function> getKafkaPropertiesWithSchema }; } - private static Map getKafkaProperties(final ConfiguredStreamsApp app) { + private static Map getKafkaProperties(final ConfiguredStreamsApp app) { final KafkaEndpointConfig endpointConfig = createEndpointConfig(); return app.getKafkaProperties(endpointConfig); } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index f615ee5b..eeee4ee1 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -27,14 +27,16 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.util.HashMap; import java.util.Map; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @RequiredArgsConstructor -public class ConfiguredProducerApp { - private final @NonNull ProducerApp app; +public class ConfiguredProducerApp { + @Getter + private final @NonNull T app; private final @NonNull ProducerAppConfiguration configuration; private static Map createKafkaProperties(final KafkaEndpointConfig endpointConfig) { @@ -87,31 +89,12 @@ public Map getKafkaProperties(final KafkaEndpointConfig endpoint return kafkaConfig; } - public ExecutableProducerApp withEndpoint(final KafkaEndpointConfig endpointConfig) { - return new ExecutableProducerApp(endpointConfig); + public ExecutableProducerApp withEndpoint(final KafkaEndpointConfig endpointConfig) { + return new ExecutableProducerApp<>(this.getTopics(), this.getKafkaProperties(endpointConfig), this.app); } - @RequiredArgsConstructor - public class ExecutableProducerApp { - private final @NonNull KafkaEndpointConfig endpointConfig; - - public ProducerCleanUpRunner createCleanUpRunner() { - final Map kafkaProperties = - ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig); - final ProducerCleanUpConfigurer configurer = new ProducerCleanUpConfigurer(); - ConfiguredProducerApp.this.app.setupCleanUp(configurer); - return ProducerCleanUpRunner.create(ConfiguredProducerApp.this.configuration.getTopics(), kafkaProperties, - configurer); + public ProducerTopicConfig getTopics() { + return this.configuration.getTopics(); } - public ProducerRunner createRunner() { - final Map kafkaProperties = - ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig); - final ProducerBuilder producerBuilder = ProducerBuilder.builder() - .topics(ConfiguredProducerApp.this.configuration.getTopics()) - .kafkaProperties(kafkaProperties) - .build(); - return new ProducerRunner(() -> ConfiguredProducerApp.this.app.run(producerBuilder)); - } - } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 6f577465..664ed24f 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -27,6 +27,7 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.HashMap; import java.util.Map; +import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.serialization.Serdes.StringSerde; @@ -34,8 +35,9 @@ import org.apache.kafka.streams.Topology; @RequiredArgsConstructor -public class ConfiguredStreamsApp { - private final @NonNull StreamsApp app; +public class ConfiguredStreamsApp { + @Getter + private final @NonNull T app; private final @NonNull StreamsAppConfiguration configuration; private static Map createKafkaProperties(final KafkaEndpointConfig endpointConfig) { @@ -89,75 +91,33 @@ public Map getKafkaProperties(final KafkaEndpointConfig endpoint kafkaConfig.putAll(this.configuration.getKafkaConfig()); kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv())); kafkaConfig.putAll(endpointConfig.createKafkaProperties()); - kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.app.getUniqueAppId(this.configuration.getTopics())); + kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.app.getUniqueAppId(this.getTopics())); return kafkaConfig; } - public Topology createTopology(final Map kafkaProperties) { - return this.createTopology(kafkaProperties, false); + public StreamsTopicConfig getTopics() { + return this.configuration.getTopics(); } - public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { - return new ExecutableStreamsApp(endpointConfig); + public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) { + final Map kafkaProperties = this.getKafkaProperties(endpointConfig); + final Topology topology = this.createTopology(kafkaProperties); + return new ExecutableStreamsApp(topology, new StreamsConfig(kafkaProperties), this.app); } /** * Create the topology of the Kafka Streams app * - * @return topology of the Kafka Streams app * @param kafkaProperties configuration that should be used by clients to configure Kafka utilities - * @param cleanUp whether topology is created in cleanUp context + * @return topology of the Kafka Streams app */ - private Topology createTopology(final Map kafkaProperties, final boolean cleanUp) { + public Topology createTopology(final Map kafkaProperties) { final TopologyBuilder topologyBuilder = TopologyBuilder.builder() - .topics(this.configuration.getTopics()) + .topics(this.getTopics()) .kafkaProperties(kafkaProperties) - .cleanUp(cleanUp) .build(); this.app.buildTopology(topologyBuilder); return topologyBuilder.build(); } - @RequiredArgsConstructor - public class ExecutableStreamsApp { - - private final @NonNull KafkaEndpointConfig endpointConfig; - - public StreamsCleanUpRunner createCleanUpRunner() { - final Map kafkaProperties = - ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig); - final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties, true); - final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer(); - ConfiguredStreamsApp.this.app.setupCleanUp(configurer); - configurer.registerFinishHook(ConfiguredStreamsApp.this.app::close); - return StreamsCleanUpRunner.create(topology, kafkaProperties, configurer); - } - - public StreamsRunner createRunner() { - return this.createRunner(StreamsExecutionOptions.builder().build()); - } - - public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) { - final Map kafkaProperties = - ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig); - final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties); - return StreamsRunner.builder() - .topology(topology) - .config(new StreamsConfig(kafkaProperties)) - .executionOptions(executionOptions) - .hooks(this.createHooks()) - .build(); - } - - private StreamsHooks createHooks() { - return StreamsHooks.builder() - .stateListener(ConfiguredStreamsApp.this.app.getStateListener()) - .uncaughtExceptionHandler(ConfiguredStreamsApp.this.app.getUncaughtExceptionHandler()) - .onStart(ConfiguredStreamsApp.this.app::onStreamsStart) - .onShutdown(ConfiguredStreamsApp.this.app::close) - .build(); - } - - } - } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java new file mode 100644 index 00000000..7bc87bb6 --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableProducerApp.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Map; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public class ExecutableProducerApp { + private final @NonNull ProducerTopicConfig topics; + private final @NonNull Map kafkaProperties; + private final @NonNull T app; + + public ProducerCleanUpRunner createCleanUpRunner() { + final ProducerCleanUpConfigurer configurer = new ProducerCleanUpConfigurer(); + this.app.setupCleanUp(configurer); + return ProducerCleanUpRunner.create(this.topics, this.kafkaProperties, configurer); + } + + public ProducerRunner createRunner() { + final ProducerBuilder producerBuilder = ProducerBuilder.builder() + .topics(this.topics) + .kafkaProperties(this.kafkaProperties) + .build(); + return new ProducerRunner(() -> this.app.run(producerBuilder)); + } +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java new file mode 100644 index 00000000..92c848cb --- /dev/null +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ExecutableStreamsApp.java @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2024 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; + +@RequiredArgsConstructor +@Getter +public class ExecutableStreamsApp { + + private final @NonNull Topology topology; + private final @NonNull StreamsConfig kafkaProperties; + private final @NonNull T app; + + public StreamsCleanUpRunner createCleanUpRunner() { + final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer(); + this.app.setupCleanUp(configurer); + configurer.registerFinishHook(this.app::close); + return StreamsCleanUpRunner.create(this.topology, this.kafkaProperties, configurer); + } + + public StreamsRunner createRunner() { + return this.createRunner(StreamsExecutionOptions.builder().build()); + } + + public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) { + final StreamsHooks hooks = this.createHooks(); + return StreamsRunner.builder() + .topology(this.topology) + .config(this.kafkaProperties) + .executionOptions(executionOptions) + .hooks(hooks) + .build(); + } + + private StreamsHooks createHooks() { + return StreamsHooks.builder() + .stateListener(this.app.getStateListener()) + .uncaughtExceptionHandler(this.app.getUncaughtExceptionHandler()) + .onStart(this.app::onStreamsStart) + .onShutdown(this.app::close) + .build(); + } + +} diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfig.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfig.java index 7987b236..43c5e378 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfig.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsAppConfig.java @@ -33,17 +33,17 @@ @Value public class StreamsAppConfig { - @NonNull Map kafkaProperties; + @NonNull StreamsConfig kafkaProperties; public String getAppId() { - return (String) this.kafkaProperties.get(StreamsConfig.APPLICATION_ID_CONFIG); + return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG); } public String getBoostrapServers() { - return (String) this.kafkaProperties.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + return this.kafkaProperties.getString(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); } public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.kafkaProperties); + return Collections.unmodifiableMap(this.kafkaProperties.originals()); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index b30fffe0..18d97546 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -63,11 +63,11 @@ public final class StreamsCleanUpRunner { private final @NonNull StreamsCleanUpHooks cleanHooks; public static StreamsCleanUpRunner create(final @NonNull Topology topology, - final @NonNull Map kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) { + final @NonNull StreamsConfig kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) { final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(kafkaProperties); final TopologyInformation topologyInformation = new TopologyInformation(topology, streamsAppConfig.getAppId()); return new StreamsCleanUpRunner(topologyInformation, topology, streamsAppConfig, - cleanHooks.create(kafkaProperties)); + cleanHooks.create(streamsAppConfig.getKafkaProperties())); } /** diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java index 8910bcb1..62b7ff7c 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -42,7 +42,6 @@ public class TopologyBuilder { StreamsBuilder streamsBuilder = new StreamsBuilder(); @NonNull StreamsTopicConfig topics; @NonNull Map kafkaProperties; - boolean cleanUp; public KStream streamInput(final Consumed consumed) { return this.streamsBuilder.stream(this.topics.getInputTopics(), consumed); diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 37e2a840..f9796046 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -32,15 +32,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; class AvroMirrorTest { - final MirrorWithNonDefaultSerde app = new MirrorWithNonDefaultSerde(); - private final ConfiguredStreamsApp configuredApp = createApp(); + private final ConfiguredStreamsApp app = this.createApp(); @RegisterExtension final TestTopologyExtension testTopology = - StreamsBootstrapTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.configuredApp); + StreamsBootstrapTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app); @Test void shouldMirror() { - final Serde valueSerde = this.app.getValueSerde( + final Serde valueSerde = this.app.getApp().getValueSerde( this.testTopology.getStreamsConfig().originals()); final TestRecord record = TestRecord.newBuilder() .setContent("bar") @@ -55,8 +54,9 @@ void shouldMirror() { .expectNoMoreRecord(); } - private ConfiguredStreamsApp createApp() { - return new ConfiguredStreamsApp(app, StreamsAppConfiguration.builder() + private ConfiguredStreamsApp createApp() { + return new ConfiguredStreamsApp(new MirrorWithNonDefaultSerde(), + StreamsAppConfiguration.builder() .topics(StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/CloseFlagApp.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/CloseFlagApp.java index 56252902..952c2e90 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/CloseFlagApp.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/CloseFlagApp.java @@ -40,11 +40,10 @@ // @Override // public void close() { // this.closed = true; -// super.close(); // } // // @Override -// public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) { +// public void buildTopology(final TopologyBuilder builder) { // final KStream input = builder.streamInput(); // input.to(builder.getTopics().getOutputTopic()); // } diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 8e6729d2..f6b8caa7 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -37,8 +37,8 @@ class ConfiguredProducerAppTest { @Test void shouldPrioritizeConfigCLIParameters() { - final ConfiguredProducerApp configuredApp = - new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() .kafkaConfig(Map.of( "foo", "baz", "kafka", "streams" @@ -54,8 +54,8 @@ void shouldPrioritizeConfigCLIParameters() { @Test void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { - final ConfiguredProducerApp configuredApp = - new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() .build()); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") @@ -67,8 +67,8 @@ void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() { @Test void shouldSetDefaultStringSerializerWhenSchemaRegistryUrlIsNotSet() { - final ConfiguredProducerApp configuredApp = - new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() + final ConfiguredProducerApp configuredApp = + new ConfiguredProducerApp(new TestProducer(), ProducerAppConfiguration.builder() .build()); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index 4b616d67..4dc3e2ff 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -38,8 +38,8 @@ class ConfiguredStreamsAppTest { @Test void shouldPrioritizeConfigCLIParameters() { - final ConfiguredStreamsApp configuredApp = - new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() .kafkaConfig(Map.of( "foo", "baz", "kafka", "streams" @@ -55,8 +55,8 @@ void shouldPrioritizeConfigCLIParameters() { @Test void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { - final ConfiguredStreamsApp configuredApp = - new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() .build()); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") @@ -69,8 +69,8 @@ void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() { @Test void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() { - final ConfiguredStreamsApp configuredApp = - new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp(new TestApplication(), StreamsAppConfiguration.builder() .build()); assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder() .brokers("fake") diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 87088c33..5b02223d 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -32,8 +32,13 @@ //import static org.mockito.Mockito.verifyNoMoreInteractions; // //import com.bakdata.kafka.CleanUpException; +//import com.bakdata.kafka.ConfiguredStreamsApp; +//import com.bakdata.kafka.StreamsApp; +//import com.bakdata.kafka.StreamsAppConfiguration; //import com.bakdata.kafka.StreamsCleanUpRunner; //import com.bakdata.kafka.CloseFlagApp; +//import com.bakdata.kafka.StreamsOptions; +//import com.bakdata.kafka.StreamsTopicConfig; //import com.bakdata.kafka.TestRecord; //import com.bakdata.kafka.test_applications.ComplexTopologyApplication; //import com.bakdata.kafka.test_applications.MirrorKeyWithAvro; @@ -96,7 +101,7 @@ // @RegisterExtension // final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); // private EmbeddedKafkaCluster kafkaCluster; -// private KafkaStreamsApplication app = null; +// private ConfiguredStreamsApp app = null; // @InjectSoftAssertions // private SoftAssertions softly; // @Mock @@ -110,10 +115,9 @@ // } // // @AfterEach -// void teardown() throws InterruptedException { +// void tearDown() throws InterruptedException { // if (this.app != null) { // this.app.close(); -// this.app.getStreams().cleanUp(); // this.app = null; // } // @@ -585,15 +589,15 @@ // this.app.close(); // } // -// private KafkaStreamsApplication createWordCountApplication() { +// private ConfiguredStreamsApp createWordCountApplication() { // return this.setupAppNoSr(new WordCount(), "word_input", "word_output", "word_error"); // } // -// private KafkaStreamsApplication createWordCountPatternApplication() { +// private ConfiguredStreamsApp createWordCountPatternApplication() { // return this.setupAppNoSr(new WordCountPattern(), Pattern.compile(".*_topic"), "word_output", "word_error"); // } // -// private KafkaStreamsApplication createMirrorValueApplication() { +// private ConfiguredStreamsApp createMirrorValueApplication() { // return this.setupAppWithSr(new MirrorValueWithAvro(), "input", "output", "key_error"); // } // @@ -601,16 +605,16 @@ // return this.setupAppWithSr(new CloseFlagApp(), "input", "output", "key_error"); // } // -// private KafkaStreamsApplication createMirrorKeyApplication() { +// private ConfiguredStreamsApp createMirrorKeyApplication() { // return this.setupAppWithSr(new MirrorKeyWithAvro(), "input", "output", "value_error"); // } // -// private KafkaStreamsApplication createComplexApplication() { +// private ConfiguredStreamsApp createComplexApplication() { // this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); // return this.setupAppWithSr(new ComplexTopologyApplication(), "input", "output", "value_error"); // } // -// private KafkaStreamsApplication createComplexCleanUpHookApplication() { +// private ConfiguredStreamsApp createComplexCleanUpHookApplication() { // this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); // return this.setupAppWithSr(new ComplexTopologyApplication() { // @Override @@ -621,37 +625,48 @@ // }, "input", "output", "value_error"); // } // -// private T setupAppWithSr(final T application, final String inputTopicName, +// private ConfiguredStreamsApp setupAppWithSr(final StreamsApp application, final String inputTopicName, // final String outputTopicName, final String errorTopicName) { -// this.setupApp(application, outputTopicName, errorTopicName); +// this.setupApp(application, StreamsTopicConfig.builder() +// .outputTopic(outputTopicName) +// .errorTopic(errorTopicName) +// .build()); // application.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); // application.setInputTopics(List.of(inputTopicName)); // return application; // } // -// private T setupAppNoSr(final T application, final String inputTopicName, +// private ConfiguredStreamsApp setupAppNoSr(final StreamsApp application, final String inputTopicName, // final String outputTopicName, final String errorTopicName) { -// this.setupApp(application, outputTopicName, errorTopicName); +// this.setupApp(application, StreamsTopicConfig.builder() +// .outputTopic(outputTopicName) +// .errorTopic(errorTopicName) +// .build()); // application.setInputTopics(List.of(inputTopicName)); // return application; // } // -// private T setupAppNoSr(final T application, final Pattern inputPattern, +// private ConfiguredStreamsApp setupAppNoSr(final StreamsApp application, final Pattern inputPattern, // final String outputTopicName, final String errorTopicName) { -// this.setupApp(application, outputTopicName, errorTopicName); +// this.setupApp(application, StreamsTopicConfig.builder() +// .outputTopic(outputTopicName) +// .errorTopic(errorTopicName) +// .build()); // application.setInputPattern(inputPattern); // return application; // } // -// private void setupApp(final T application, final String outputTopicName, -// final String errorTopicName) { -// application.setOutputTopic(outputTopicName); -// application.setErrorTopic(errorTopicName); -// application.setBrokers(this.kafkaCluster.getBrokerList()); -// application.setProductive(false); -// application.setKafkaConfig(Map.of( +// private ConfiguredStreamsApp setupApp(final StreamsApp application, StreamsTopicConfig +// streamsTopicConfig) { +// return new ConfiguredStreamsApp(application, StreamsAppConfiguration.builder() +// .topics(streamsTopicConfig) +// .options(StreamsOptions.builder() +// .productive(false) +// .build()) +// .kafkaConfig(Map.of( // StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", // ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" -// )); +// )) +// .build()); // } //} diff --git a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java index 2d48c1e6..75d599fd 100644 --- a/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java +++ b/streams-bootstrap/src/test/java/com/bakdata/kafka/util/TopologyInformationTest.java @@ -57,8 +57,8 @@ void setup() { .inputTopics(List.of("input", "input2")) .outputTopic("output") .build(); - final ConfiguredStreamsApp configuredApp = - new ConfiguredStreamsApp(this.app, StreamsAppConfiguration.builder() + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp(this.app, StreamsAppConfiguration.builder() .topics(this.topics) .build()); final Map kafkaProperties = configuredApp.getKafkaProperties(