Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 19, 2024
1 parent 38aa137 commit 96e2dad
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,24 @@
*/
public interface LargeMessageProducerApp extends ProducerApp {

static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook(final ProducerCleanUpConfiguration configurer,
final EffectiveAppConfiguration<?> configuration) {
return configurer.registerTopicHook(
/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code ProducerCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook(
final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default ProducerCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<ProducerTopicConfig> configuration) {
final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(configurer, configuration);
final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,24 @@
*/
public interface LargeMessageStreamsApp extends StreamsApp {

static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook(final StreamsCleanUpConfiguration configurer,
final EffectiveAppConfiguration<?> configuration) {
return configurer.registerTopicHook(
/**
* Register a hook that cleans up LargeMessage files associated with a topic
* @param cleanUpConfiguration Configuration to register hook on
* @param configuration Configuration to create hook from
* @return {@code StreamsCleanUpConfiguration} with registered topic hook
* @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration)
*/
static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook(
final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration<?> configuration) {
return cleanUpConfiguration.registerTopicHook(
LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration));
}

@Override
default StreamsCleanUpConfiguration setupCleanUp(
final EffectiveAppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(configurer, configuration);
final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration);
return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ public final CA createConfiguredApp(final boolean cleanUp) {
*/
public final AppConfiguration<T> createConfiguration() {
final T topics = this.createTopicConfig();
final Map<String, String> kafkaConfig = this.getKafkaConfig();
return new AppConfiguration<>(topics, kafkaConfig);
return new AppConfiguration<>(topics, this.kafkaConfig);
}

/**
Expand Down

0 comments on commit 96e2dad

Please sign in to comment.