Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jul 26, 2024
1 parent 7619a0c commit d6c5439
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ public void close() {
*/
@Override
public void stop() {
this.cleanUpRunner.close();
this.app.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
/**
* Cleans all resources associated with an application
*/
@FunctionalInterface
public interface CleanUpRunner {
public interface CleanUpRunner extends AutoCloseable {

@Override
void close();

/**
* Clean all resources associated with an application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package com.bakdata.kafka;

import java.io.Closeable;

/**
* Interface for performing actions on topics
* @param <SELF> self for chaining
Expand All @@ -41,13 +43,18 @@ public interface HasTopicHooks<SELF> {
/**
* Hook for performing actions on topics
*/
interface TopicHook {
interface TopicHook extends Closeable {
/**
* Called when a topic is deleted
* @param topic name of the topic
*/
default void deleted(final String topic) {
// do nothing
}

@Override
default void close() {
// do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import lombok.NonNull;
Expand All @@ -32,7 +33,7 @@
* Provides configuration options for {@link ProducerCleanUpRunner}
*/
public class ProducerCleanUpConfiguration
implements HasTopicHooks<ProducerCleanUpConfiguration>, HasCleanHook<ProducerCleanUpConfiguration> {
implements HasTopicHooks<ProducerCleanUpConfiguration>, HasCleanHook<ProducerCleanUpConfiguration>, Closeable {
private final @NonNull Collection<TopicHook> topicHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> cleanHooks = new ArrayList<>();

Expand All @@ -54,6 +55,11 @@ public ProducerCleanUpConfiguration registerCleanHook(final Runnable hook) {
return this;
}

@Override
public void close() {
this.topicHooks.forEach(TopicHook::close);
}

void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

@Override
public void close() {
this.cleanHooks.close();
}

/**
* Delete all output topics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import lombok.NonNull;
Expand All @@ -32,7 +33,7 @@
* Provides configuration options for {@link StreamsCleanUpRunner}
*/
public class StreamsCleanUpConfiguration
implements HasTopicHooks<StreamsCleanUpConfiguration>, HasCleanHook<StreamsCleanUpConfiguration> {
implements HasTopicHooks<StreamsCleanUpConfiguration>, HasCleanHook<StreamsCleanUpConfiguration>, Closeable {
private final @NonNull Collection<TopicHook> topicHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> cleanHooks = new ArrayList<>();
private final @NonNull Collection<Runnable> resetHooks = new ArrayList<>();
Expand Down Expand Up @@ -65,6 +66,11 @@ public StreamsCleanUpConfiguration registerResetHook(final Runnable hook) {
return this;
}

@Override
public void close() {
this.topicHooks.forEach(TopicHook::close);
}

void runCleanHooks() {
this.cleanHooks.forEach(Runnable::run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ private static Collection<String> filterExistingTopics(final Collection<String>
.collect(Collectors.toList());
}

@Override
public void close() {
this.cleanHooks.close();
}

/**
* Clean up your Streams app by resetting the app and deleting the output topics
* and consumer group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -88,32 +90,8 @@ public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map
* @see HasTopicHooks#registerTopicHook(TopicHook)
*/
public static TopicHook createSchemaRegistryCleanUpHook(final Map<String, Object> kafkaProperties) {
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties); //TODO close
return new TopicHook() {
@Override
public void deleted(final String topic) {
log.info("Resetting Schema Registry for topic '{}'", topic);
try {
final Collection<String> allSubjects = schemaRegistryClient.getAllSubjects();
final String keySubject = topic + "-key";
if (allSubjects.contains(keySubject)) {
schemaRegistryClient.deleteSubject(keySubject);
log.info("Cleaned key schema of topic {}", topic);
} else {
log.info("No key schema for topic {} available", topic);
}
final String valueSubject = topic + "-value";
if (allSubjects.contains(valueSubject)) {
schemaRegistryClient.deleteSubject(valueSubject);
log.info("Cleaned value schema of topic {}", topic);
} else {
log.info("No value schema for topic {} available", topic);
}
} catch (final IOException | RestClientException e) {
throw new CleanUpException("Could not reset schema registry for topic " + topic, e);
}
}
};
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(kafkaProperties);
return new SchemaRegistryTopicHook(schemaRegistryClient);
}

/**
Expand All @@ -129,4 +107,41 @@ public static TopicHook createSchemaRegistryCleanUpHook(final EffectiveAppConfig
return createSchemaRegistryCleanUpHook(configuration.getKafkaProperties());
}

@RequiredArgsConstructor
private static class SchemaRegistryTopicHook implements TopicHook {
private final @NonNull SchemaRegistryClient schemaRegistryClient;

@Override
public void deleted(final String topic) {
log.info("Resetting Schema Registry for topic '{}'", topic);
try {
final Collection<String> allSubjects = this.schemaRegistryClient.getAllSubjects();
final String keySubject = topic + "-key";
if (allSubjects.contains(keySubject)) {
this.schemaRegistryClient.deleteSubject(keySubject);
log.info("Cleaned key schema of topic {}", topic);
} else {
log.info("No key schema for topic {} available", topic);
}
final String valueSubject = topic + "-value";
if (allSubjects.contains(valueSubject)) {
this.schemaRegistryClient.deleteSubject(valueSubject);
log.info("Cleaned value schema of topic {}", topic);
} else {
log.info("No value schema for topic {} available", topic);
}
} catch (final IOException | RestClientException e) {
throw new CleanUpException("Could not reset schema registry for topic " + topic, e);
}
}

@Override
public void close() {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}
}

0 comments on commit d6c5439

Please sign in to comment.