Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 8, 2024
1 parent bdb4158 commit ad03342
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class LargeMessageKafkaApplicationUtils {
*
* @param kafkaProperties Kafka properties to create hook from
* @return hook that cleans up LargeMessage files associated with a topic
* @see HasTopicHooks#registerTopicHook(HookFactory)
* @see HasTopicHooks#registerTopicHook(TopicHook)
*/
public static TopicHook createLargeMessageCleanUpHook(final Map<String, Object> kafkaProperties) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(kafkaProperties);
Expand All @@ -52,15 +52,4 @@ public void deleted(final String topic) {
};
}

/**
* Register a hook that cleans up LargeMessage files associated with a topic.
*
* @param cleanUpRunner {@code CleanUpRunner} to register hook on
* @return self for chaining
* @see #createLargeMessageCleanUpHook(Map)
*/
public static <T> T registerLargeMessageCleanUpHook(final HasTopicHooks<T> cleanUpRunner) {
return cleanUpRunner.registerTopicHook(
LargeMessageKafkaApplicationUtils::createLargeMessageCleanUpHook);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
public interface LargeMessageProducerApp extends ProducerApp {

@Override
default ProducerCleanUpConfiguration setupCleanUp() {
final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp();
return LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(configurer);
default ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) {
final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration);
return configurer.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
public interface LargeMessageStreamsApp extends StreamsApp {

@Override
default StreamsCleanUpConfiguration setupCleanUp() {
final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp();
return LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(configurer);
default StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration configuration) {
final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration);
return configurer.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration.getKafkaProperties()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,16 @@ public StreamsTopicConfig getTopics() {
public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> kafkaProperties = this.getKafkaProperties(endpointConfig);
final Topology topology = this.createTopology(kafkaProperties);
final StreamsSetupConfiguration setupConfiguration = StreamsSetupConfiguration.builder()
.kafkaProperties(kafkaProperties)
.topics(this.getTopics())
.build();
return ExecutableStreamsApp.<T>builder()
.topology(topology)
.config(new StreamsConfig(kafkaProperties))
.app(this.app)
.setup(() -> this.setupApp(kafkaProperties))
.setup(() -> this.app.setup(setupConfiguration))
.setupCleanup(() -> this.app.setupCleanUp(setupConfiguration))
.build();
}

Expand All @@ -174,12 +179,4 @@ public void close() {
this.app.close();
}

private void setupApp(final Map<String, Object> kafkaProperties) {
final StreamsSetupConfiguration setupConfiguration = StreamsSetupConfiguration.builder()
.kafkaProperties(kafkaProperties)
.topics(this.getTopics())
.build();
this.app.setup(setupConfiguration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class ExecutableProducerApp<T extends ProducerApp>
*/
@Override
public ProducerCleanUpRunner createCleanUpRunner() {
final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp();
final ProducerSetupConfiguration configuration = this.createSetupConfiguration();
final ProducerCleanUpConfiguration configurer = this.app.setupCleanUp(configuration);
return ProducerCleanUpRunner.create(this.topics, this.kafkaProperties, configurer);
}

Expand All @@ -67,7 +68,8 @@ public ProducerRunner createRunner(final ProducerExecutionOptions options) {
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
this.setup();
final ProducerSetupConfiguration configuration = this.createSetupConfiguration();
this.app.setup(configuration);
return new ProducerRunner(this.app.buildRunnable(producerBuilder));
}

Expand All @@ -76,11 +78,14 @@ public void close() {
this.app.close();
}

private void setup() {
final ProducerSetupConfiguration configuration = ProducerSetupConfiguration.builder()
private ProducerSetupConfiguration createSetupConfiguration() {
return ProducerSetupConfiguration.builder()
.topics(this.topics)
.kafkaProperties(this.kafkaProperties)
.build();
}

private void setup(final ProducerSetupConfiguration configuration) {
this.app.setup(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -46,16 +47,16 @@ public class ExecutableStreamsApp<T extends StreamsApp>
private final @NonNull StreamsConfig config;
@Getter
private final @NonNull T app;
@Builder.Default
private final @NonNull Runnable setup = () -> {};
private final @NonNull Runnable setup;
private final @NonNull Supplier<StreamsCleanUpConfiguration> setupCleanup;

/**
* Create {@code StreamsCleanUpRunner} in order to clean application
* @return {@code StreamsCleanUpRunner}
*/
@Override
public StreamsCleanUpRunner createCleanUpRunner() {
final StreamsCleanUpConfiguration configurer = this.app.setupCleanUp();
final StreamsCleanUpConfiguration configurer = this.setupCleanup.get();
return StreamsCleanUpRunner.create(this.topology, this.config, configurer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
public interface HasCleanHook<SELF> {
/**
* Register a hook that is invoked when cleaning apps
* @param hookFactory factory to create hook from
* @param hook factory to create hook from
* @return self for chaining
*/
SELF registerCleanHook(HookFactory<Runnable> hookFactory);
SELF registerCleanHook(Runnable hook);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
public interface HasTopicHooks<SELF> {
/**
* Register a hook that is invoked when performing actions on topics
* @param hookFactory factory to create {@link TopicHook} from
*
* @param hook Action to run. Topic is passed as parameter
* @return self for chaining
*/
SELF registerTopicHook(HookFactory<TopicHook> hookFactory);
SELF registerTopicHook(TopicHook hook);

/**
* Hook for performing actions on topics
Expand Down
40 changes: 0 additions & 40 deletions streams-bootstrap/src/main/java/com/bakdata/kafka/HookFactory.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ default Map<String, Object> createKafkaProperties() {

/**
* Configure clean up behavior
* @param configuration provides all runtime application configurations
* @return {@code ProducerCleanUpConfiguration}
* @see ProducerCleanUpRunner
*/
default ProducerCleanUpConfiguration setupCleanUp() {
default ProducerCleanUpConfiguration setupCleanUp(final ProducerSetupConfiguration configuration) {
return new ProducerCleanUpConfiguration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,66 +26,39 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.NonNull;

/**
* Provides configuration options for {@link ProducerCleanUpRunner}
*/
public class ProducerCleanUpConfiguration
implements HasTopicHooks<ProducerCleanUpConfiguration>, HasCleanHook<ProducerCleanUpConfiguration> {
private final @NonNull Collection<HookFactory<TopicHook>> topicDeletionHooks = new ArrayList<>();
private final @NonNull Collection<HookFactory<Runnable>> cleanHooks = new ArrayList<>();
private final @NonNull Collection<TopicHook> topicHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> cleanHooks = new ArrayList<>();

/**
* Register a hook that is executed whenever a topic has been deleted by the cleanup runner.
*
* @param hookFactory Action to run. Topic is passed as parameter
* @return this for chaining
* @see ProducerCleanUpRunner
*/
@Override
public ProducerCleanUpConfiguration registerTopicHook(final HookFactory<TopicHook> hookFactory) {
this.topicDeletionHooks.add(hookFactory);
public ProducerCleanUpConfiguration registerTopicHook(final TopicHook hook) {
this.topicHooks.add(hook);
return this;
}

/**
* Register an action that is executed after {@link ProducerCleanUpRunner#clean()} has finished
* @param action Action to run
* @return this for chaining
*/
@Override
public ProducerCleanUpConfiguration registerCleanHook(final HookFactory<Runnable> action) {
this.cleanHooks.add(action);
public ProducerCleanUpConfiguration registerCleanHook(final Runnable hook) {
this.cleanHooks.add(hook);
return this;
}

ProducerCleanUpHooks create(final Map<String, Object> kafkaConfig) {
return ProducerCleanUpHooks.builder()
.topicHooks(this.topicDeletionHooks.stream()
.map(t -> t.create(kafkaConfig))
.collect(Collectors.toList()))
.cleanHooks(this.cleanHooks.stream()
.map(c -> c.create(kafkaConfig))
.collect(Collectors.toList()))
.build();
void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}

@Builder(access = AccessLevel.PRIVATE)
static class ProducerCleanUpHooks {
private final @NonNull Collection<TopicHook> topicHooks;
private final @NonNull Collection<Runnable> cleanHooks;

void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}

void runTopicDeletionHooks(final String topic) {
this.topicHooks.forEach(hook -> hook.deleted(topic));
}
void runTopicDeletionHooks(final String topic) {
this.topicHooks.forEach(hook -> hook.deleted(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import com.bakdata.kafka.ProducerCleanUpConfiguration.ProducerCleanUpHooks;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.util.Map;
import lombok.AccessLevel;
Expand All @@ -42,7 +41,7 @@
public final class ProducerCleanUpRunner implements CleanUpRunner {
private final @NonNull ProducerTopicConfig topics;
private final @NonNull Map<String, Object> kafkaProperties;
private final @NonNull ProducerCleanUpHooks cleanHooks;
private final @NonNull ProducerCleanUpConfiguration cleanHooks;

/**
* Create a new {@code ProducerCleanUpRunner} with default {@link ProducerCleanUpConfiguration}
Expand All @@ -67,7 +66,7 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics,
@NonNull final Map<String, Object> kafkaProperties,
@NonNull final ProducerCleanUpConfiguration configuration) {
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration.create(kafkaProperties));
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
/**
* Configuration for setting up a {@link ProducerApp}
* @see ProducerApp#setup(ProducerSetupConfiguration)
* @see ProducerApp#setupCleanUp(ProducerSetupConfiguration)
*/
@Builder
@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ default Map<String, Object> createKafkaProperties() {

/**
* Configure clean up behavior
* @param configuration provides all runtime application configurations
* @return {@code StreamsCleanUpConfiguration}
* @see StreamsCleanUpRunner
*/
default StreamsCleanUpConfiguration setupCleanUp() {
default StreamsCleanUpConfiguration setupCleanUp(final StreamsSetupConfiguration configuration) {
return new StreamsCleanUpConfiguration();
}

Expand Down
Loading

0 comments on commit ad03342

Please sign in to comment.