diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 9cac0b42..eeb335da 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -44,11 +44,19 @@ import picocli.CommandLine.ParseResult; /** - *

The base class of the entry point of the Kafka application.

- * This class provides common configuration options, e.g., {@link #brokers}, for Kafka applications. Hereby it - * automatically populates the passed in command line arguments with matching environment arguments - * {@link EnvironmentArgumentsParser}. To implement your Kafka application inherit from this class and add your custom - * options. + *

The base class for creating Kafka applications.

+ * This class provides the following configuration options: + * + * To implement your Kafka application inherit from this class and add your custom options. Run it by calling + * {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main. */ @ToString @Getter @@ -110,6 +118,9 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) { return allArgs.toArray(String[]::new); } + /** + * Clean all resources associated with this application + */ public abstract void clean(); @Override @@ -117,11 +128,19 @@ public void close() { // do nothing by default } + /** + * Configure application when running in debug mode. By default, Log4j2 log level is configured to debug for + * {@code com.bakdata} and the applications package. + */ protected void configureDebug() { Configurator.setLevel("com.bakdata", Level.DEBUG); Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG); } + /** + * Create {@code KafkaEndpointConfig} specified by {@link #brokers} and {@link #schemaRegistryUrl} + * @return {@code KafkaEndpointConfig} + */ protected KafkaEndpointConfig getEndpointConfig() { return KafkaEndpointConfig.builder() .brokers(this.brokers) diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index f5654b73..e1697c73 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -35,12 +35,10 @@ /** - *

The base class of the entry point of a producer application.

- * This class provides common configuration options, e.g., {@link #brokers}, for producer applications. Hereby it - * automatically populates the passed in command line arguments with matching environment arguments - * {@link EnvironmentArgumentsParser}. To implement your producer application inherit from this class and add your - * custom options. Call {@link #startApplication(KafkaApplication, String[])} with a fresh instance of your class from - * your main. + *

The base class for creating Kafka Producer applications.

+ * This class provides all configuration options provided by {@link KafkaApplication}. + * To implement your Kafka Producer application inherit from this class and add your custom options. Run it by + * calling {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main. */ @ToString(callSuper = true) @Getter @@ -54,6 +52,10 @@ public abstract class KafkaProducerApplication extends KafkaApplication { // concurrently iterating on #runners and removing from #runners private ConcurrentLinkedDeque> runningApps = new ConcurrentLinkedDeque<>(); + /** + * Run the application. + * @see ProducerRunner#run() + */ @Override public void run() { try (final ExecutableProducerApp app = this.createExecutableApp()) { @@ -64,6 +66,9 @@ public void run() { } } + /** + * Delete all output topics associated with the Kafka Producer application. + */ @Command(description = "Delete all output topics associated with the Kafka Producer application.") @Override public void clean() { @@ -73,13 +78,35 @@ public void clean() { } } - public ConfiguredProducerApp createConfiguredApp() { + /** + * @see #stop() + */ + @Override + public void close() { + super.close(); + this.stop(); + } + + /** + * Stop all applications that have been started by {@link #run()}. + */ + public void stop() { + this.runningApps.forEach(ExecutableProducerApp::close); + } + + /** + * Create a new {@code ProducerApp} that will be configured and executed according to this application. + * @return {@code ProducerApp} + */ + protected abstract ProducerApp createApp(); + + private ConfiguredProducerApp createConfiguredApp() { final ProducerApp producerApp = this.createApp(); final ProducerAppConfiguration configuration = this.createConfiguration(); return new ConfiguredProducerApp<>(producerApp, configuration); } - public ProducerAppConfiguration createConfiguration() { + private ProducerAppConfiguration createConfiguration() { final ProducerTopicConfig topics = this.createTopicConfig(); final Map kafkaConfig = this.getKafkaConfig(); return ProducerAppConfiguration.builder() @@ -88,25 +115,13 @@ public ProducerAppConfiguration createConfiguration() { .build(); } - public ProducerTopicConfig createTopicConfig() { + private ProducerTopicConfig createTopicConfig() { return ProducerTopicConfig.builder() .outputTopic(this.getOutputTopic()) .extraOutputTopics(this.getExtraOutputTopics()) .build(); } - public abstract ProducerApp createApp(); - - @Override - public void close() { - super.close(); - this.stop(); - } - - public void stop() { - this.runningApps.forEach(ExecutableProducerApp::close); - } - private ExecutableProducerApp createExecutableApp() { final ConfiguredProducerApp app = this.createConfiguredApp(); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 63d8818b..531d127d 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -45,12 +45,19 @@ /** - *

The base class of the entry point of the streaming application.

- * This class provides common configuration options e.g. {@link #brokers}, {@link #productive} for streaming - * application. Hereby it automatically populates the passed in command line arguments with matching environment - * arguments {@link EnvironmentArgumentsParser}. To implement your streaming application inherit from this class and add - * your custom options. Call {@link #startApplication(KafkaApplication, String[])} with a fresh instance of your class - * from your main. + *

The base class for creating Kafka Streams applications.

+ * This class provides the following configuration options in addition to those provided by {@link KafkaApplication}: + *
    + *
  • {@link #inputTopics}
  • + *
  • {@link #inputPattern}
  • + *
  • {@link #errorTopic}
  • + *
  • {@link #extraInputTopics}
  • + *
  • {@link #extraInputPatterns}
  • + *
  • {@link #productive}
  • + *
  • {@link #volatileGroupInstanceId}
  • + *
+ * To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling + * {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main. */ @ToString(callSuper = true) @Getter @@ -83,8 +90,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication { private ConcurrentLinkedDeque runningApps = new ConcurrentLinkedDeque<>(); /** - * Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown, - * either because it caught an error or the application has received a shutdown event. + * Run the application. + * @see StreamsRunner#run() */ @Override public void run() { @@ -95,10 +102,17 @@ public void run() { } } - public void stop() { + /** + * Stop all applications that have been started by {@link #run()}. + */ + public final void stop() { this.runningApps.forEach(RunningApp::close); } + /** + * Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate + * topics associated with the Kafka Streams application. + */ @Command( description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all " + "output and intermediate topics associated with the Kafka Streams application.") @@ -110,12 +124,19 @@ public void clean() { } } + /** + * @see #stop() + */ @Override public void close() { super.close(); this.stop(); } + /** + * Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams + * application. + */ @Command( description = "Clear all state stores, consumer group offsets, and internal topics associated with the " + "Kafka Streams application.") @@ -126,7 +147,43 @@ public void reset() { } } - public StreamsRunner createRunner(final ExecutableStreamsApp app) { + /** + * Create a new {@code StreamsApp} that will be configured and executed according to this application. + * @param cleanUp whether {@code StreamsApp} is created for clean up purposes. In that case, the user might want + * to skip initialization of expensive resources. + * @return {@code StreamsApp} + */ + protected abstract StreamsApp createApp(boolean cleanUp); + + /** + * Create a {@link StateListener} to use for Kafka Streams. + * + * @return {@code StateListener}. {@link NoOpStateListener} by default + * @see KafkaStreams#setStateListener(StateListener) + */ + protected StateListener createStateListener() { + return new NoOpStateListener(); + } + + /** + * Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams. + * + * @return {@code StreamsUncaughtExceptionHandler}. {@link ShutdownClientUncaughtExceptionHandler} by default + * @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) + */ + protected StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() { + return new ShutdownClientUncaughtExceptionHandler(); + } + + /** + * Called after starting Kafka Streams + * @param streams running {@code KafkaStreams} instance + */ + protected void onStreamsStart(final KafkaStreams streams) { + // do nothing by default + } + + private StreamsRunner createRunner(final ExecutableStreamsApp app) { final StreamsExecutionOptions executionOptions = this.createExecutionOptions(); final StreamsHooks hooks = this.createHooks(); return app.createRunner(executionOptions, hooks); @@ -138,38 +195,19 @@ private RunningApp createRunningApp() { return new RunningApp(app, runner); } - public abstract StreamsApp createApp(boolean cleanUp); - - public StreamsExecutionOptions createExecutionOptions() { + private StreamsExecutionOptions createExecutionOptions() { return StreamsExecutionOptions.builder() .volatileGroupInstanceId(this.volatileGroupInstanceId) .build(); } - @RequiredArgsConstructor - private static class RunningApp implements AutoCloseable { - private final @NonNull ExecutableStreamsApp app; - private final @NonNull StreamsRunner runner; - - @Override - public void close() { - this.runner.close(); - // close app after streams because messages currently processed might depend on resources - this.app.close(); - } - - private void run() { - this.runner.run(); - } - } - - public ConfiguredStreamsApp createConfiguredApp(final boolean cleanUp) { + private ConfiguredStreamsApp createConfiguredApp(final boolean cleanUp) { final StreamsApp streamsApp = this.createApp(cleanUp); final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration(); return new ConfiguredStreamsApp<>(streamsApp, streamsAppConfiguration); } - public StreamsAppConfiguration createConfiguration() { + private StreamsAppConfiguration createConfiguration() { final StreamsTopicConfig topics = this.createTopicConfig(); final Map kafkaConfig = this.getKafkaConfig(); final StreamsOptions streamsOptions = this.createStreamsOptions(); @@ -180,7 +218,7 @@ public StreamsAppConfiguration createConfiguration() { .build(); } - public StreamsTopicConfig createTopicConfig() { + private StreamsTopicConfig createTopicConfig() { return StreamsTopicConfig.builder() .inputTopics(this.inputTopics) .extraInputTopics(this.extraInputTopics) @@ -192,40 +230,16 @@ public StreamsTopicConfig createTopicConfig() { .build(); } - public ExecutableStreamsApp createExecutableApp(final boolean cleanUp) { + private ExecutableStreamsApp createExecutableApp(final boolean cleanUp) { final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp(cleanUp); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); return configuredStreamsApp.withEndpoint(endpointConfig); } - /** - * Create a {@link StateListener} to use for Kafka Streams. - * - * @return {@code StateListener}. - * @see KafkaStreams#setStateListener(StateListener) - */ - protected StateListener getStateListener() { - return new NoOpStateListener(); - } - - /** - * Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams. - * - * @return {@code StreamsUncaughtExceptionHandler}. - * @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) - */ - protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() { - return new DefaultStreamsUncaughtExceptionHandler(); - } - - protected void onStreamsStart(final KafkaStreams streams) { - // do nothing by default - } - private StreamsHooks createHooks() { return StreamsHooks.builder() - .uncaughtExceptionHandler(this.getUncaughtExceptionHandler()) - .stateListener(this.getStateListener()) + .uncaughtExceptionHandler(this.createUncaughtExceptionHandler()) + .stateListener(this.createStateListener()) .onStart(this::onStreamsStart) .build(); } @@ -235,4 +249,21 @@ private StreamsOptions createStreamsOptions() { .productive(this.productive) .build(); } + + @RequiredArgsConstructor + private static class RunningApp implements AutoCloseable { + private final @NonNull ExecutableStreamsApp app; + private final @NonNull StreamsRunner runner; + + @Override + public void close() { + this.runner.close(); + // close app after streams because messages currently processed might depend on resources + this.app.close(); + } + + private void run() { + this.runner.run(); + } + } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java index 288044c5..fdd61e93 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java @@ -28,6 +28,9 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; +/** + * {@code KafkaProducerApplication} without any additional configuration options. + */ @RequiredArgsConstructor public class SimpleKafkaProducerApplication extends KafkaProducerApplication { private final @NonNull Supplier appFactory; diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java index 0abb9f39..473142ce 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java @@ -24,17 +24,29 @@ package com.bakdata.kafka; +import java.util.function.Function; import java.util.function.Supplier; import lombok.NonNull; import lombok.RequiredArgsConstructor; +/** + * {@code KafkaStreamsApplication} without any additional configuration options. + */ @RequiredArgsConstructor public class SimpleKafkaStreamsApplication extends KafkaStreamsApplication { - private final @NonNull Supplier appFactory; + private final @NonNull Function appFactory; + + /** + * Create new {@code SimpleKafkaStreamsApplication} + * @param appFactory factory to create {@code StreamsApp} without any parameters + */ + public SimpleKafkaStreamsApplication(final Supplier appFactory) { + this(cleanUp -> appFactory.get()); + } @Override public StreamsApp createApp(final boolean cleanUp) { - return this.appFactory.get(); + return this.appFactory.apply(cleanUp); } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/StringListConverter.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/StringListConverter.java index b78eaa21..a31657bc 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/StringListConverter.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/StringListConverter.java @@ -32,10 +32,10 @@ * Converter for lists inside collection type parsed by PicoCLI. List members need to be separated by {@code ;} */ public class StringListConverter implements ITypeConverter> { - private static final Splitter TOPIC_SPLITTER = Splitter.on(";").omitEmptyStrings().trimResults(); + private static final Splitter SPLITTER = Splitter.on(";").omitEmptyStrings().trimResults(); @Override public List convert(final String value) { - return TOPIC_SPLITTER.splitToList(value); + return SPLITTER.splitToList(value); } } diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/DefaultStreamsUncaughtExceptionHandler.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/ShutdownClientUncaughtExceptionHandler.java similarity index 93% rename from streams-bootstrap/src/main/java/com/bakdata/kafka/DefaultStreamsUncaughtExceptionHandler.java rename to streams-bootstrap/src/main/java/com/bakdata/kafka/ShutdownClientUncaughtExceptionHandler.java index 341a1ad4..61951fdd 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/DefaultStreamsUncaughtExceptionHandler.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/ShutdownClientUncaughtExceptionHandler.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2023 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -26,7 +26,7 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; -class DefaultStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler { +class ShutdownClientUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler { @Override public StreamThreadExceptionResponse handle(final Throwable e) { return StreamThreadExceptionResponse.SHUTDOWN_CLIENT; diff --git a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsHooks.java b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsHooks.java index 35a54a34..0961ca4a 100644 --- a/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsHooks.java +++ b/streams-bootstrap/src/main/java/com/bakdata/kafka/StreamsHooks.java @@ -41,7 +41,7 @@ public class StreamsHooks { @Getter @Builder.Default private final @NonNull StreamsUncaughtExceptionHandler uncaughtExceptionHandler = - new DefaultStreamsUncaughtExceptionHandler(); + new ShutdownClientUncaughtExceptionHandler(); public void onStart(final KafkaStreams streams) { this.onStart.accept(streams);