From 9ce7863247d9795b66f06bfc048d5bcfe0122197 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 24 Jun 2024 15:45:10 +0200 Subject: [PATCH] Expose RunnableApp --- .../java/com/bakdata/kafka/ImprovedStreamsConfig.java | 11 ++++++----- .../java/com/bakdata/kafka/StreamsCleanUpRunner.java | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java index 0b92731d..70f9fa03 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java @@ -36,7 +36,8 @@ @Value public class ImprovedStreamsConfig { - @NonNull StreamsConfig kafkaProperties; + @NonNull + StreamsConfig streamsConfig; /** * Get the application id of the underlying {@link StreamsConfig} @@ -44,7 +45,7 @@ public class ImprovedStreamsConfig { * @see StreamsConfig#APPLICATION_ID_CONFIG */ public String getAppId() { - return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG); + return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG); } /** @@ -53,7 +54,7 @@ public String getAppId() { * @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG */ public String getBoostrapServers() { - return String.join(",", this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); + return String.join(",", this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)); } /** @@ -61,7 +62,7 @@ public String getBoostrapServers() { * @return Kafka configs * @see StreamsConfig#originals() */ - public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.kafkaProperties.originals()); + public Map getStreamsConfig() { + return Collections.unmodifiableMap(this.streamsConfig.originals()); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 7246d138..61f6e13d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -99,7 +99,7 @@ public static void runResetter(final Collection inputTopics, final Colle final Collection allTopics, final ImprovedStreamsConfig streamsAppConfig) { // StreamsResetter's internal AdminClient can only be configured with a properties file final String appId = streamsAppConfig.getAppId(); - final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties()); + final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getStreamsConfig()); final ImmutableList.Builder argList = ImmutableList.builder() .add("--application-id", appId) .add("--bootstrap-server", streamsAppConfig.getBoostrapServers()) @@ -183,7 +183,7 @@ public void reset() { } private Map getKafkaProperties() { - return this.config.getKafkaProperties(); + return this.config.getStreamsConfig(); } private ImprovedAdminClient createAdminClient() {