Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 3, 2024
1 parent 00380b1 commit cf85b66
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void clean() {
public ConfiguredProducerApp<ProducerApp> createConfiguredApp() {
final ProducerApp producerApp = this.createApp();
final ProducerAppConfiguration configuration = this.createConfiguration();
return new ConfiguredProducerApp<ProducerApp>(producerApp, configuration);
return new ConfiguredProducerApp<>(producerApp, configuration);
}

public ProducerAppConfiguration createConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.UseDefaultConverter;
Expand Down Expand Up @@ -99,6 +103,7 @@ public void close() {
public void clean() {
final StreamsCleanUpRunner runner = this.createCleanUpRunner();
runner.clean();
this.onStreamsShutdown();
}

@Command(description = "Clear the state store and the global Kafka offsets for the "
Expand All @@ -107,6 +112,7 @@ public void clean() {
public void reset() {
final StreamsCleanUpRunner runner = this.createCleanUpRunner();
runner.reset();
this.onStreamsShutdown();
}

public abstract StreamsApp createApp(boolean cleanUp);
Expand All @@ -120,7 +126,8 @@ public StreamsExecutionOptions createExecutionOptions() {
public StreamsRunner createRunner() {
final ExecutableStreamsApp<StreamsApp> executableStreamsApp = this.createExecutableApp(false);
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
return executableStreamsApp.createRunner(executionOptions);
final StreamsHooks hooks = this.createHooks();
return executableStreamsApp.createRunner(executionOptions, hooks);
}

public StreamsCleanUpRunner createCleanUpRunner() {
Expand All @@ -131,7 +138,7 @@ public StreamsCleanUpRunner createCleanUpRunner() {
public ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final boolean cleanUp) {
final StreamsApp streamsApp = this.createApp(cleanUp);
final StreamsAppConfiguration streamsAppConfiguration = this.createConfiguration();
return new ConfiguredStreamsApp<StreamsApp>(streamsApp, streamsAppConfiguration);
return new ConfiguredStreamsApp<>(streamsApp, streamsAppConfiguration);
}

public StreamsAppConfiguration createConfiguration() {
Expand Down Expand Up @@ -163,6 +170,47 @@ public ExecutableStreamsApp<StreamsApp> createExecutableApp(final boolean cleanU
return configuredStreamsApp.withEndpoint(endpointConfig);
}

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
* @return {@code StateListener}.
* @see KafkaStreams#setStateListener(StateListener)
*/
protected StateListener getStateListener() {
return new NoOpStateListener();
}

/**
* Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams.
*
* @return {@code StreamsUncaughtExceptionHandler}.
* @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
*/
protected StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
return new DefaultStreamsUncaughtExceptionHandler();
}

protected void onStreamsStart(final KafkaStreams streams) {
// do nothing by default
}

/**
* Method to close resources outside of {@link KafkaStreams}. Will be called by default on and on
* transitioning to {@link State#ERROR}.
*/
protected void onStreamsShutdown() {
// do nothing by default
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.uncaughtExceptionHandler(this.getUncaughtExceptionHandler())
.stateListener(this.getStateListener())
.onStart(this::onStreamsStart)
.onShutdown(this::onStreamsShutdown)
.build();
}

private StreamsOptions createStreamsOptions() {
return StreamsOptions.builder()
.productive(this.productive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public StreamsTopicConfig getTopics() {
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
return new ExecutableStreamsApp<T>(topology, new StreamsConfig(kafkaProperties), this.app);
return new ExecutableStreamsApp<>(topology, new StreamsConfig(kafkaProperties), this.app);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class ExecutableStreamsApp<T extends StreamsApp> {
public StreamsCleanUpRunner createCleanUpRunner() {
final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer();
this.app.setupCleanUp(configurer);
configurer.registerFinishHook(this.app::close);
return StreamsCleanUpRunner.create(this.topology, this.kafkaProperties, configurer);
}

Expand All @@ -50,7 +49,14 @@ public StreamsRunner createRunner() {
}

public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) {
final StreamsHooks hooks = this.createHooks();
return this.createRunner(executionOptions, StreamsHooks.builder().build());
}

public StreamsRunner createRunner(final StreamsHooks hooks) {
return this.createRunner(StreamsExecutionOptions.builder().build(), hooks);
}

public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions, final StreamsHooks hooks) {
return StreamsRunner.builder()
.topology(this.topology)
.config(this.kafkaProperties)
Expand All @@ -59,13 +65,4 @@ public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions
.build();
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.stateListener(this.app.getStateListener())
.uncaughtExceptionHandler(this.app.getUncaughtExceptionHandler())
.onStart(this.app::onStreamsStart)
.onShutdown(this.app::close)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,11 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

public interface StreamsApp extends AutoCloseable {
public interface StreamsApp {
int DEFAULT_PRODUCTIVE_REPLICATION_FACTOR = 3;

/**
* Method to close resources outside of {@link KafkaStreams}. Will be called by default on and on
* transitioning to {@link State#ERROR}.
*/
@Override
default void close() {
//do nothing by default
}

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
* @return {@code StateListener}.
* @see KafkaStreams#setStateListener(StateListener)
*/
default StateListener getStateListener() {
return new NoOpStateListener();
}

/**
* Create a {@link StreamsUncaughtExceptionHandler} to use for Kafka Streams.
*
* @return {@code StreamsUncaughtExceptionHandler}.
* @see KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
*/
default StreamsUncaughtExceptionHandler getUncaughtExceptionHandler() {
return new DefaultStreamsUncaughtExceptionHandler();
}

/**
* Build the Kafka Streams topology to be run by the app
*
Expand Down Expand Up @@ -116,8 +83,4 @@ default Map<String, Object> createKafkaProperties(final StreamsOptions options)
default void setupCleanUp(final StreamsCleanUpConfigurer cleanUpRunner) {
// do nothing by default
}

default void onStreamsStart(final KafkaStreams streams) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.bakdata.kafka;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import lombok.Value;
Expand All @@ -40,8 +39,8 @@ public String getAppId() {
return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

public List<String> getBoostrapServers() {
return this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
public String getBoostrapServers() {
return String.join(",", this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
}

public Map<String, Object> getKafkaProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class StreamsCleanUpConfigurer implements HasTopicHooks<StreamsCleanUpCon
private final @NonNull Collection<TopicDeletionHookFactory> topicDeletionHooks = new ArrayList<>();
private final @NonNull Collection<Consumer<ImprovedAdminClient>> cleanHooks = new ArrayList<>();
private final @NonNull Collection<Consumer<ImprovedAdminClient>> resetHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> finishHooks = new ArrayList<>();

/**
* Register a hook that is executed whenever a topic has been deleted by the cleanup runner.
Expand All @@ -62,19 +61,13 @@ public StreamsCleanUpConfigurer registerResetHook(final Consumer<ImprovedAdminCl
return this;
}

public StreamsCleanUpConfigurer registerFinishHook(final Runnable action) {
this.finishHooks.add(action);
return this;
}

StreamsCleanUpHooks create(final Map<String, Object> kafkaConfig) {
return StreamsCleanUpHooks.builder()
.topicDeletionHooks(this.topicDeletionHooks.stream()
.map(t -> t.create(kafkaConfig))
.collect(Collectors.toList()))
.cleanHooks(this.cleanHooks)
.resetHooks(this.resetHooks)
.finishHooks(this.finishHooks)
.build();
}

Expand All @@ -83,7 +76,6 @@ static class StreamsCleanUpHooks {
private final @NonNull Collection<TopicDeletionHook> topicDeletionHooks;
private final @NonNull Collection<Consumer<ImprovedAdminClient>> cleanHooks;
private final @NonNull Collection<Consumer<ImprovedAdminClient>> resetHooks;
private final @NonNull Collection<Runnable> finishHooks;

public void runCleanHooks(final ImprovedAdminClient adminClient) {
this.cleanHooks.forEach(hook -> hook.accept(adminClient));
Expand All @@ -93,10 +85,6 @@ public void runResetHooks(final ImprovedAdminClient adminClient) {
this.resetHooks.forEach(hook -> hook.accept(adminClient));
}

public void runFinishHooks() {
this.finishHooks.forEach(Runnable::run);
}

public void runTopicDeletionHooks(final String topic) {
this.topicDeletionHooks.forEach(hook -> hook.deleted(topic));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void runResetter(final Collection<String> inputTopics, final Colle
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties());
final ImmutableList.Builder<String> argList = ImmutableList.<String>builder()
.add("--application-id", appId)
.add("--bootstrap-server", String.join(",", streamsAppConfig.getBoostrapServers()))
.add("--bootstrap-server", streamsAppConfig.getBoostrapServers())
.add("--config-file", tempFile.toString());
final Collection<String> existingInputTopics = filterExistingTopics(inputTopics, allTopics);
if (!existingInputTopics.isEmpty()) {
Expand Down Expand Up @@ -159,7 +159,6 @@ public void clean() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final Task task = new Task(adminClient);
task.cleanAndReset();
this.cleanHooks.runFinishHooks();
waitForCleanUp();
}
}
Expand All @@ -171,7 +170,6 @@ public void reset() {
try (final ImprovedAdminClient adminClient = this.createAdminClient()) {
final Task task = new Task(adminClient);
task.reset();
this.cleanHooks.runFinishHooks();
waitForCleanUp();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ void shouldMirror() {
}

private ConfiguredStreamsApp<MirrorWithNonDefaultSerde> createApp() {
return new ConfiguredStreamsApp<MirrorWithNonDefaultSerde>(new MirrorWithNonDefaultSerde(),
return new ConfiguredStreamsApp<>(new MirrorWithNonDefaultSerde(),
StreamsAppConfiguration.builder()
.topics(StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build())
.build());
.topics(StreamsTopicConfig.builder()
.inputTopics(List.of("input"))
.outputTopic("output")
.build())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConfiguredProducerAppTest {
@Test
void shouldPrioritizeConfigCLIParameters() {
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<ProducerApp>(new TestProducer(), ProducerAppConfiguration.builder()
new ConfiguredProducerApp<>(new TestProducer(), ProducerAppConfiguration.builder()
.kafkaConfig(Map.of(
"foo", "baz",
"kafka", "streams"
Expand All @@ -55,7 +55,7 @@ void shouldPrioritizeConfigCLIParameters() {
@Test
void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() {
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<ProducerApp>(new TestProducer(), ProducerAppConfiguration.builder()
new ConfiguredProducerApp<>(new TestProducer(), ProducerAppConfiguration.builder()
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
Expand All @@ -68,7 +68,7 @@ void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() {
@Test
void shouldSetDefaultStringSerializerWhenSchemaRegistryUrlIsNotSet() {
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<ProducerApp>(new TestProducer(), ProducerAppConfiguration.builder()
new ConfiguredProducerApp<>(new TestProducer(), ProducerAppConfiguration.builder()
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ConfiguredStreamsAppTest {
@Test
void shouldPrioritizeConfigCLIParameters() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<StreamsApp>(new TestApplication(), StreamsAppConfiguration.builder()
new ConfiguredStreamsApp<>(new TestApplication(), StreamsAppConfiguration.builder()
.kafkaConfig(Map.of(
"foo", "baz",
"kafka", "streams"
Expand All @@ -56,7 +56,7 @@ void shouldPrioritizeConfigCLIParameters() {
@Test
void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<StreamsApp>(new TestApplication(), StreamsAppConfiguration.builder()
new ConfiguredStreamsApp<>(new TestApplication(), StreamsAppConfiguration.builder()
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
Expand All @@ -70,7 +70,7 @@ void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() {
@Test
void shouldSetDefaultStringSerdeWhenSchemaRegistryUrlIsNotSet() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<StreamsApp>(new TestApplication(), StreamsAppConfiguration.builder()
new ConfiguredStreamsApp<>(new TestApplication(), StreamsAppConfiguration.builder()
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void setup() {
.outputTopic("output")
.build();
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<StreamsApp>(this.app, StreamsAppConfiguration.builder()
new ConfiguredStreamsApp<>(this.app, StreamsAppConfiguration.builder()
.topics(this.topics)
.build());
final Map<String, Object> kafkaProperties = configuredApp.getKafkaProperties(
Expand Down

0 comments on commit cf85b66

Please sign in to comment.