Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jun 12, 2024
1 parent 66f74e9 commit 761df8a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import picocli.CommandLine;
import picocli.CommandLine.Command;
Expand Down Expand Up @@ -159,8 +160,9 @@ protected StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() {
/**
* Called after starting Kafka Streams
* @param streams running {@code KafkaStreams} instance
* @param config config of {@code KafkaStreams} instance
*/
protected void onStreamsStart(final KafkaStreams streams) {
protected void onStreamsStart(final KafkaStreams streams, final StreamsConfig config) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import lombok.Builder;
import lombok.NonNull;
Expand All @@ -47,7 +47,7 @@ public class StreamsExecutionOptions {
* Hook that is called after calling {@link KafkaStreams#start()}
*/
@Builder.Default
private final @NonNull Consumer<KafkaStreams> onStart = streams -> {};
private final @NonNull BiConsumer<KafkaStreams, StreamsConfig> onStart = (streams, config) -> {};
/**
* Configures {@link KafkaStreams#setStateListener(StateListener)}
*/
Expand Down Expand Up @@ -86,8 +86,8 @@ boolean shouldLeaveGroup(final Map<String, Object> originals) {
return staticMembershipDisabled || this.volatileGroupInstanceId;
}

void onStart(final KafkaStreams streams) {
this.onStart.accept(streams);
void onStart(final KafkaStreams streams, final StreamsConfig config) {
this.onStart.accept(streams, config);
}

StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
@Slf4j
public final class StreamsRunner implements Runner {

private final @NonNull StreamsConfig config;
private final @NonNull KafkaStreams streams;
private final @NonNull CapturingStreamsUncaughtExceptionHandler exceptionHandler;
private final @NonNull StreamsShutdownStateListener shutdownListener;
Expand All @@ -61,6 +62,7 @@ public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsCon
*/
public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsConfig config,
final @NonNull StreamsExecutionOptions options) {
this.config = config;
this.streams = new KafkaStreams(topology, config);
this.exceptionHandler = new CapturingStreamsUncaughtExceptionHandler(options.createUncaughtExceptionHandler());
this.streams.setUncaughtExceptionHandler(this.exceptionHandler);
Expand Down Expand Up @@ -106,7 +108,7 @@ private void runStreams() {
log.info("Starting Kafka Streams");
this.streams.start();
log.info("Calling start hook");
this.executionOptions.onStart(this.streams);
this.executionOptions.onStart(this.streams, this.config);
}

private void awaitStreamsShutdown() {
Expand Down

0 comments on commit 761df8a

Please sign in to comment.