Skip to content

Commit

Permalink
Expose RunnableApp
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jun 24, 2024
1 parent 98334f4 commit 9ce7863
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@
@Value
public class ImprovedStreamsConfig {

@NonNull StreamsConfig kafkaProperties;
@NonNull
StreamsConfig streamsConfig;

/**
* Get the application id of the underlying {@link StreamsConfig}
* @return application id
* @see StreamsConfig#APPLICATION_ID_CONFIG
*/
public String getAppId() {
return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG);
return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

/**
Expand All @@ -53,15 +54,15 @@ public String getAppId() {
* @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG
*/
public String getBoostrapServers() {
return String.join(",", this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
return String.join(",", this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
}

/**
* Get all configs of the underlying {@link StreamsConfig}
* @return Kafka configs
* @see StreamsConfig#originals()
*/
public Map<String, Object> getKafkaProperties() {
return Collections.unmodifiableMap(this.kafkaProperties.originals());
public Map<String, Object> getStreamsConfig() {
return Collections.unmodifiableMap(this.streamsConfig.originals());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void runResetter(final Collection<String> inputTopics, final Colle
final Collection<String> allTopics, final ImprovedStreamsConfig streamsAppConfig) {
// StreamsResetter's internal AdminClient can only be configured with a properties file
final String appId = streamsAppConfig.getAppId();
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties());
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getStreamsConfig());
final ImmutableList.Builder<String> argList = ImmutableList.<String>builder()
.add("--application-id", appId)
.add("--bootstrap-server", streamsAppConfig.getBoostrapServers())
Expand Down Expand Up @@ -183,7 +183,7 @@ public void reset() {
}

private Map<String, Object> getKafkaProperties() {
return this.config.getKafkaProperties();
return this.config.getStreamsConfig();
}

private ImprovedAdminClient createAdminClient() {
Expand Down

0 comments on commit 9ce7863

Please sign in to comment.