Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 4, 2024
1 parent f374ffc commit 2023565
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,19 @@
import picocli.CommandLine.ParseResult;

/**
* <p>The base class of the entry point of the Kafka application.</p>
* This class provides common configuration options, e.g., {@link #brokers}, for Kafka applications. Hereby it
* automatically populates the passed in command line arguments with matching environment arguments
* {@link EnvironmentArgumentsParser}. To implement your Kafka application inherit from this class and add your custom
* options.
* <p>The base class for creating Kafka applications.</p>
* This class provides the following configuration options:
* <ul>
* <li>{@link #brokers}</li>
* <li>{@link #outputTopic}</li>
* <li>{@link #extraOutputTopics}</li>
* <li>{@link #brokers}</li>
* <li>{@link #debug}</li>
* <li>{@link #schemaRegistryUrl}</li>
* <li>{@link #kafkaConfig}</li>
* </ul>
* To implement your Kafka application inherit from this class and add your custom options. Run it by calling
* {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
*/
@ToString
@Getter
Expand Down Expand Up @@ -110,18 +118,29 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) {
return allArgs.toArray(String[]::new);
}

/**
* Clean all resources associated with this application
*/
public abstract void clean();

@Override
public void close() {
// do nothing by default
}

/**
* Configure application when running in debug mode. By default, Log4j2 log level is configured to debug for
* {@code com.bakdata} and the applications package.
*/
protected void configureDebug() {
Configurator.setLevel("com.bakdata", Level.DEBUG);
Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG);
}

/**
* Create {@code KafkaEndpointConfig} specified by {@link #brokers} and {@link #schemaRegistryUrl}
* @return {@code KafkaEndpointConfig}
*/
protected KafkaEndpointConfig getEndpointConfig() {
return KafkaEndpointConfig.builder()
.brokers(this.brokers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@


/**
* <p>The base class of the entry point of a producer application.</p>
* This class provides common configuration options, e.g., {@link #brokers}, for producer applications. Hereby it
* automatically populates the passed in command line arguments with matching environment arguments
* {@link EnvironmentArgumentsParser}. To implement your producer application inherit from this class and add your
* custom options. Call {@link #startApplication(KafkaApplication, String[])} with a fresh instance of your class from
* your main.
* <p>The base class for creating Kafka Producer applications.</p>
* This class provides all configuration options provided by {@link KafkaApplication}.
* To implement your Kafka Producer application inherit from this class and add your custom options. Run it by
* calling {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
*/
@ToString(callSuper = true)
@Getter
Expand All @@ -54,6 +52,10 @@ public abstract class KafkaProducerApplication extends KafkaApplication {
// concurrently iterating on #runners and removing from #runners
private ConcurrentLinkedDeque<ExecutableProducerApp<ProducerApp>> runningApps = new ConcurrentLinkedDeque<>();

/**
* Run the application.
* @see ProducerRunner#run()
*/
@Override
public void run() {
try (final ExecutableProducerApp<ProducerApp> app = this.createExecutableApp()) {
Expand All @@ -64,6 +66,9 @@ public void run() {
}
}

/**
* Delete all output topics associated with the Kafka Producer application.
*/
@Command(description = "Delete all output topics associated with the Kafka Producer application.")
@Override
public void clean() {
Expand All @@ -73,13 +78,35 @@ public void clean() {
}
}

public ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
/**
* @see #stop()
*/
@Override
public void close() {
super.close();
this.stop();
}

/**
* Stop all applications that have been started by {@link #run()}.
*/
public void stop() {
this.runningApps.forEach(ExecutableProducerApp::close);
}

/**
* Create a new {@code ProducerApp} that will be configured and executed according to this application.
* @return {@code ProducerApp}
*/
protected abstract ProducerApp createApp();

private ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
final ProducerApp producerApp = this.createApp();
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp<>(producerApp, configuration);
}

public ProducerAppConfiguration createConfiguration() {
private ProducerAppConfiguration createConfiguration() {
final ProducerTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return ProducerAppConfiguration.builder()
Expand All @@ -88,25 +115,13 @@ public ProducerAppConfiguration createConfiguration() {
.build();
}

public ProducerTopicConfig createTopicConfig() {
private ProducerTopicConfig createTopicConfig() {
return ProducerTopicConfig.builder()
.outputTopic(this.getOutputTopic())
.extraOutputTopics(this.getExtraOutputTopics())
.build();
}

public abstract ProducerApp createApp();

@Override
public void close() {
super.close();
this.stop();
}

public void stop() {
this.runningApps.forEach(ExecutableProducerApp::close);
}

private ExecutableProducerApp<ProducerApp> createExecutableApp() {
final ConfiguredProducerApp<ProducerApp> app = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@


/**
* <p>The base class of the entry point of the streaming application.</p>
* This class provides common configuration options e.g. {@link #brokers}, {@link #productive} for streaming
* application. Hereby it automatically populates the passed in command line arguments with matching environment
* arguments {@link EnvironmentArgumentsParser}. To implement your streaming application inherit from this class and add
* your custom options. Call {@link #startApplication(KafkaApplication, String[])} with a fresh instance of your class
* from your main.
* <p>The base class for creating Kafka Streams applications.</p>
* This class provides the following configuration options in addition to those provided by {@link KafkaApplication}:
* <ul>
* <li>{@link #inputTopics}</li>
* <li>{@link #inputPattern}</li>
* <li>{@link #errorTopic}</li>
* <li>{@link #extraInputTopics}</li>
* <li>{@link #extraInputPatterns}</li>
* <li>{@link #productive}</li>
* <li>{@link #volatileGroupInstanceId}</li>
* </ul>
* To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling
* {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
*/
@ToString(callSuper = true)
@Getter
Expand Down Expand Up @@ -83,8 +90,8 @@ public abstract class KafkaStreamsApplication extends KafkaApplication {
private ConcurrentLinkedDeque<RunningApp> runningApps = new ConcurrentLinkedDeque<>();

/**
* Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown,
* either because it caught an error or the application has received a shutdown event.
* Run the application.
* @see StreamsRunner#run()
*/
@Override
public void run() {
Expand All @@ -95,10 +102,17 @@ public void run() {
}
}

public void stop() {
/**
* Stop all applications that have been started by {@link #run()}.
*/
public final void stop() {
this.runningApps.forEach(RunningApp::close);
}

/**
* Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
* topics associated with the Kafka Streams application.
*/
@Command(
description = "Reset the Kafka Streams application. Additionally, delete the consumer group and all "
+ "output and intermediate topics associated with the Kafka Streams application.")
Expand All @@ -110,12 +124,19 @@ public void clean() {
}
}

/**
* @see #stop()
*/
@Override
public void close() {
super.close();
this.stop();
}

/**
* Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams
* application.
*/
@Command(
description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
Expand All @@ -126,7 +147,43 @@ public void reset() {
}
}

public StreamsRunner createRunner(final ExecutableStreamsApp<StreamsApp> app) {
/**
* Create a new {@code StreamsApp} that will be configured and executed according to this application.
* @param cleanUp whether {@code StreamsApp} is created for clean up purposes. In that case, the user might want
* to skip initialization of expensive resources.
* @return {@code StreamsApp}
*/
protected abstract StreamsApp createApp(boolean cleanUp);

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
* @return {@code StateListener}. {@link NoOpStateListener} by default
* @see KafkaStreams#setStateListener(StateListener)
*/
protected StateListener createStateListener() {
return new NoOpStateListener();
}

/**
* Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams.
*
* @return {@code StreamsUncaughtExceptionHandler}. {@link ShutdownClientUncaughtExceptionHandler} by default
* @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
*/
protected StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() {
return new ShutdownClientUncaughtExceptionHandler();
}

/**
* Called after starting Kafka Streams
* @param streams running {@code KafkaStreams} instance
*/
protected void onStreamsStart(final KafkaStreams streams) {
// do nothing by default
}

private StreamsRunner createRunner(final ExecutableStreamsApp<StreamsApp> app) {
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
final StreamsHooks hooks = this.createHooks();
return app.createRunner(executionOptions, hooks);
Expand All @@ -138,38 +195,19 @@ private RunningApp createRunningApp() {
return new RunningApp(app, runner);
}

public abstract StreamsApp createApp(boolean cleanUp);

public StreamsExecutionOptions createExecutionOptions() {
private StreamsExecutionOptions createExecutionOptions() {
return StreamsExecutionOptions.builder()
.volatileGroupInstanceId(this.volatileGroupInstanceId)
.build();
}

@RequiredArgsConstructor
private static class RunningApp implements AutoCloseable {
private final @NonNull ExecutableStreamsApp<StreamsApp> app;
private final @NonNull StreamsRunner runner;

@Override
public void close() {
this.runner.close();
// close app after streams because messages currently processed might depend on resources
this.app.close();
}

private void run() {
this.runner.run();
}
}

public ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
private ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp streamsApp = this.createApp(cleanUp);
final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration();
return new ConfiguredStreamsApp<>(streamsApp, streamsAppConfiguration);
}

public StreamsAppConfiguration createConfiguration() {
private StreamsAppConfiguration createConfiguration() {
final StreamsTopicConfig topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
final StreamsOptions streamsOptions = this.createStreamsOptions();
Expand All @@ -180,7 +218,7 @@ public StreamsAppConfiguration createConfiguration() {
.build();
}

public StreamsTopicConfig createTopicConfig() {
private StreamsTopicConfig createTopicConfig() {
return StreamsTopicConfig.builder()
.inputTopics(this.inputTopics)
.extraInputTopics(this.extraInputTopics)
Expand All @@ -192,40 +230,16 @@ public StreamsTopicConfig createTopicConfig() {
.build();
}

public ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanUp) {
private ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanUp) {
final ConfiguredStreamsApp<StreamsApp> configuredStreamsApp = this.createConfiguredApp(cleanUp);
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
* @return {@code StateListener}.
* @see KafkaStreams#setStateListener(StateListener)
*/
protected StateListener getStateListener() {
return new NoOpStateListener();
}

/**
* Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams.
*
* @return {@code StreamsUncaughtExceptionHandler}.
* @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
*/
protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
return new DefaultStreamsUncaughtExceptionHandler();
}

protected void onStreamsStart(final KafkaStreams streams) {
// do nothing by default
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.uncaughtExceptionHandler(this.getUncaughtExceptionHandler())
.stateListener(this.getStateListener())
.uncaughtExceptionHandler(this.createUncaughtExceptionHandler())
.stateListener(this.createStateListener())
.onStart(this::onStreamsStart)
.build();
}
Expand All @@ -235,4 +249,21 @@ private StreamsOptions createStreamsOptions() {
.productive(this.productive)
.build();
}

@RequiredArgsConstructor
private static class RunningApp implements AutoCloseable {
private final @NonNull ExecutableStreamsApp<StreamsApp> app;
private final @NonNull StreamsRunner runner;

@Override
public void close() {
this.runner.close();
// close app after streams because messages currently processed might depend on resources
this.app.close();
}

private void run() {
this.runner.run();
}
}
}
Loading

0 comments on commit 2023565

Please sign in to comment.