Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 3, 2024
1 parent fa9a1f1 commit 00380b1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.bakdata.kafka;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import lombok.Value;
Expand All @@ -39,8 +40,8 @@ public String getAppId() {
return this.kafkaProperties.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

public String getBoostrapServers() {
return this.kafkaProperties.getString(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
public List<String> getBoostrapServers() {
return this.kafkaProperties.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}

public Map<String, Object> getKafkaProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public final class StreamsCleanUpRunner {
private final @NonNull StreamsAppConfig appConfig;
private final @NonNull StreamsCleanUpHooks cleanHooks;

public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig kafkaProperties) {
return create(topology, kafkaProperties, new StreamsCleanUpConfigurer());
}

public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig kafkaProperties, final @NonNull StreamsCleanUpConfigurer cleanHooks) {
final StreamsAppConfig streamsAppConfig = new StreamsAppConfig(kafkaProperties);
Expand All @@ -86,7 +91,7 @@ public static void runResetter(final Collection<String> inputTopics, final Colle
final File tempFile = createTemporaryPropertiesFile(appId, streamsAppConfig.getKafkaProperties());
final ImmutableList.Builder<String> argList = ImmutableList.<String>builder()
.add("--application-id", appId)
.add("--bootstrap-servers", streamsAppConfig.getBoostrapServers())
.add("--bootstrap-server", String.join(",", streamsAppConfig.getBoostrapServers()))
.add("--config-file", tempFile.toString());
final Collection<String> existingInputTopics = filterExistingTopics(inputTopics, allTopics);
if (!existingInputTopics.isEmpty()) {
Expand Down Expand Up @@ -129,10 +134,6 @@ static Properties toStringBasedProperties(final Map<String, Object> config) {
return parsedProperties;
}

public Map<String, Object> getKafkaProperties() {
return this.appConfig.getKafkaProperties();
}

private static Collection<String> filterExistingTopics(final Collection<String> topics,
final Collection<String> allTopics) {
return topics.stream()
Expand All @@ -146,6 +147,10 @@ private static Collection<String> filterExistingTopics(final Collection<String>
.collect(Collectors.toList());
}

public Map<String, Object> getKafkaProperties() {
return this.appConfig.getKafkaProperties();
}

/**
* Clean up your Streams app by resetting the app, deleting local state and deleting the output topics
* and consumer group.
Expand Down

0 comments on commit 00380b1

Please sign in to comment.