Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 3, 2024
1 parent f675e23 commit d4ef207
Show file tree
Hide file tree
Showing 29 changed files with 336 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import com.bakdata.kafka.HasTopicHooks.TopicDeletionHook;
import java.util.Map;
import java.util.function.Consumer;
import lombok.experimental.UtilityClass;
Expand All @@ -41,7 +42,7 @@ public class LargeMessageKafkaApplicationUtils {
* @return hook that cleans up LargeMessage files associated with a topic
* @see StreamsCleanUpRunner#registerTopicDeletionHook(Consumer)
*/
public static Consumer<String> createLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties) {
public static TopicDeletionHook createLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties);
final LargeMessageStoringClient storer = largeMessageConfig.getStorer();
return storer::deleteAllFiles;
Expand All @@ -54,22 +55,7 @@ public static Consumer<String> createLargeMessageCleanUpHook(final Map<String, O
* @param cleanUpRunner {@code CleanUpRunner} to register hook on
* @see #createLargeMessageCleanUpHook(Map)
*/
public static void registerLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties,
final StreamsCleanUpRunner cleanUpRunner) {
final Consumer<String> deleteAllFiles = createLargeMessageCleanUpHook(kafkaProperties);
cleanUpRunner.registerTopicDeletionHook(deleteAllFiles);
}

/**
* Register a hook that cleans up LargeMessage files associated with a topic.
*
* @param kafkaProperties Kafka properties to create hook from
* @param cleanUpRunner {@code ProducerCleanUpRunner} to register hook on
* @see #createLargeMessageCleanUpHook(Map)
*/
public static void registerLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties,
final ProducerCleanUpRunner cleanUpRunner) {
final Consumer<String> deleteAllFiles = createLargeMessageCleanUpHook(kafkaProperties);
cleanUpRunner.registerTopicDeletionHook(deleteAllFiles);
public static void registerLargeMessageCleanUpHook(final HasTopicHooks<?> cleanUpRunner) {
cleanUpRunner.registerTopicDeletionHook(LargeMessageKafkaApplicationUtils::createLargeMessageCleanUpHook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
public interface LargeMessageKafkaProducerApplication extends ProducerApp {

@Override
default void setupCleanUp(final ProducerCleanUpRunner cleanUpRunner) {
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(cleanUpRunner.getKafkaProperties());
default void setupCleanUp(final ProducerCleanUpConfigurer cleanUpRunner) {
ProducerApp.super.setupCleanUp(cleanUpRunner);
LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
public interface LargeMessageKafkaStreamsApplication extends StreamsApp {

@Override
default void setupCleanUp(final StreamsCleanUpRunner cleanUpRunner) {
LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner.getKafkaProperties(),
cleanUpRunner);
default void setupCleanUp(final StreamsCleanUpConfigurer cleanUpRunner) {
StreamsApp.super.setupCleanUp(cleanUpRunner);
LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(cleanUpRunner);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import com.bakdata.kafka.ConfiguredProducerApp.ExecutableProducerApp;
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -81,17 +82,21 @@ public ProducerTopicConfig createTopicConfig() {
.build();
}

protected abstract ProducerApp createApp();
public abstract ProducerApp createApp();

private ProducerRunner createRunner() {
final ConfiguredProducerApp app = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return app.createRunner(endpointConfig);
public ProducerRunner createRunner() {
final ExecutableProducerApp executableApp = this.createExecutableApp();
return executableApp.createRunner();
}

public ProducerCleanUpRunner createCleanUpRunner() {
final ExecutableProducerApp executableApp = this.createExecutableApp();
return executableApp.createCleanUpRunner();
}

private ProducerCleanUpRunner createCleanUpRunner() {
private ExecutableProducerApp createExecutableApp() {
final ConfiguredProducerApp app = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return app.createCleanUpRunner(endpointConfig);
return app.withEndpoint(endpointConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import com.bakdata.kafka.ConfiguredStreamsApp.ExecutableStreamsApp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,15 +75,6 @@ public abstract class KafkaStreamsApplication extends KafkaApplication implement
@ToString.Exclude
private List<StreamsRunner> runners = new ArrayList<>();

public static void main(final String[] args) {
startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp() {
return null;
}
}, args);
}

/**
* Run the application. If Kafka Streams is run, this method blocks until Kafka Streams has completed shutdown,
* either because it caught an error or the application has received a shutdown event.
Expand Down Expand Up @@ -124,16 +116,14 @@ public StreamsExecutionOptions createExecutionOptions() {
}

public StreamsRunner createRunner() {
final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
final ExecutableStreamsApp executableStreamsApp = this.createExecutableApp();
final StreamsExecutionOptions executionOptions = this.createExecutionOptions();
return configuredStreamsApp.createRunner(endpointConfig, executionOptions);
return executableStreamsApp.createRunner(executionOptions);
}

public StreamsCleanUpRunner createCleanUpRunner() {
final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.createCleanUpRunner(endpointConfig);
final ExecutableStreamsApp executableApp = this.createExecutableApp();
return executableApp.createCleanUpRunner();
}

public ConfiguredStreamsApp createConfiguredApp() {
Expand Down Expand Up @@ -165,6 +155,12 @@ public StreamsTopicConfig createTopicConfig() {
.build();
}

public ExecutableStreamsApp createExecutableApp() {
final ConfiguredStreamsApp configuredStreamsApp = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

private StreamsOptions createStreamsOptions() {
return StreamsOptions.builder()
.productive(this.productive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SimpleKafkaProducerApplication extends KafkaProducerApplication {
private final @NonNull Supplier<ProducerApp> appFactory;

@Override
protected ProducerApp createApp() {
public ProducerApp createApp() {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void shouldExitWithSuccessCode() {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -84,7 +84,7 @@ public void run() {
void shouldExitWithErrorCodeOnRunError() {
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand All @@ -108,7 +108,7 @@ void shouldExitWithErrorCodeOnCleanupError() {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -140,7 +140,7 @@ void shouldExitWithErrorCodeOnMissingBrokerParameter() {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -169,7 +169,7 @@ void shouldExitWithErrorInTopology() throws InterruptedException {
try (final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig());
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
.peek((k, v) -> {
throw new RuntimeException();
Expand Down Expand Up @@ -202,7 +202,7 @@ void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException {
try (final EmbeddedKafkaCluster kafkaCluster = provisionWith(defaultClusterConfig());
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
.to(builder.getTopics().getOutputTopic());
}
Expand Down Expand Up @@ -242,7 +242,7 @@ void shouldExitWithErrorOnCleanupError() {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand All @@ -268,7 +268,7 @@ void shouldParseArguments() {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@NoArgsConstructor
public class Mirror implements StreamsApp {
@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class WordCount implements StreamsApp {

@Override
public void buildTopology(final TopologyBuilder builder, final boolean cleanUp) {
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> textLines = builder.streamInput();

final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,31 @@ public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpoint
return kafkaConfig;
}

public ProducerCleanUpRunner createCleanUpRunner(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final ProducerCleanUpRunner cleanUpRunner =
new ProducerCleanUpRunner(this.configuration.getTopics(), kafkaProperties);
public ExecutableProducerApp withEndpoint(final KafkaEndpointConfig endpointConfig) {
return new ExecutableProducerApp(endpointConfig);
}

@RequiredArgsConstructor
public class ExecutableProducerApp {
private final @NonNull KafkaEndpointConfig endpointConfig;

this.app.setupCleanUp(cleanUpRunner);
return cleanUpRunner;
public ProducerCleanUpRunner createCleanUpRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig);
final ProducerCleanUpConfigurer configurer = new ProducerCleanUpConfigurer();
ConfiguredProducerApp.this.app.setupCleanUp(configurer);
return ProducerCleanUpRunner.create(ConfiguredProducerApp.this.configuration.getTopics(), kafkaProperties,
configurer);
}

public ProducerRunner createRunner(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
public ProducerRunner createRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredProducerApp.this.getKafkaProperties(this.endpointConfig);
final ProducerBuilder producerBuilder = ProducerBuilder.builder()
.topics(this.configuration.getTopics())
.topics(ConfiguredProducerApp.this.configuration.getTopics())
.kafkaProperties(kafkaProperties)
.build();
return new ProducerRunner(() -> this.app.run(producerBuilder));
return new ProducerRunner(() -> ConfiguredProducerApp.this.app.run(producerBuilder));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,8 @@ public Topology createTopology(final Map<String, Object> kafkaProperties) {
return this.createTopology(kafkaProperties, false);
}

public StreamsCleanUpRunner createCleanUpRunner(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties, true);
final StreamsCleanUpRunner cleanUpRunner = StreamsCleanUpRunner.create(topology, kafkaProperties);
cleanUpRunner.registerFinishHook(this.app::close);

this.app.setupCleanUp(cleanUpRunner);
return cleanUpRunner;
}

public StreamsRunner createRunner(final KafkaEndpointConfig endpointConfig) {
return this.createRunner(endpointConfig, StreamsExecutionOptions.builder().build());
}

public StreamsRunner createRunner(final KafkaEndpointConfig endpointConfig,
final StreamsExecutionOptions executionOptions) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
return StreamsRunner.builder()
.topology(topology)
.config(new StreamsConfig(kafkaProperties))
.executionOptions(executionOptions)
.hooks(this.createHooks())
.build();
public ExecutableStreamsApp withEndpoint(final KafkaEndpointConfig endpointConfig) {
return new ExecutableStreamsApp(endpointConfig);
}

/**
Expand All @@ -134,18 +112,52 @@ private Topology createTopology(final Map<String, Object> kafkaProperties, final
final TopologyBuilder topologyBuilder = TopologyBuilder.builder()
.topics(this.configuration.getTopics())
.kafkaProperties(kafkaProperties)
.cleanUp(cleanUp)
.build();
this.app.buildTopology(topologyBuilder, cleanUp);
this.app.buildTopology(topologyBuilder);
return topologyBuilder.build();
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.stateListener(this.app.getStateListener())
.uncaughtExceptionHandler(this.app.getUncaughtExceptionHandler())
.onStart(this.app::onStreamsStart)
.onShutdown(this.app::close)
.build();
@RequiredArgsConstructor
public class ExecutableStreamsApp {

private final @NonNull KafkaEndpointConfig endpointConfig;

public StreamsCleanUpRunner createCleanUpRunner() {
final Map<String, Object> kafkaProperties =
ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig);
final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties, true);
final StreamsCleanUpConfigurer configurer = new StreamsCleanUpConfigurer();
ConfiguredStreamsApp.this.app.setupCleanUp(configurer);
configurer.registerFinishHook(ConfiguredStreamsApp.this.app::close);
return StreamsCleanUpRunner.create(topology, kafkaProperties, configurer);
}

public StreamsRunner createRunner() {
return this.createRunner(StreamsExecutionOptions.builder().build());
}

public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) {
final Map<String, Object> kafkaProperties =
ConfiguredStreamsApp.this.getKafkaProperties(this.endpointConfig);
final Topology topology = ConfiguredStreamsApp.this.createTopology(kafkaProperties);
return StreamsRunner.builder()
.topology(topology)
.config(new StreamsConfig(kafkaProperties))
.executionOptions(executionOptions)
.hooks(this.createHooks())
.build();
}

private StreamsHooks createHooks() {
return StreamsHooks.builder()
.stateListener(ConfiguredStreamsApp.this.app.getStateListener())
.uncaughtExceptionHandler(ConfiguredStreamsApp.this.app.getUncaughtExceptionHandler())
.onStart(ConfiguredStreamsApp.this.app::onStreamsStart)
.onShutdown(ConfiguredStreamsApp.this.app::close)
.build();
}

}

}
Loading

0 comments on commit d4ef207

Please sign in to comment.