From 761df8a652dc2b1750e5426d9e791c4853029bd1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 12 Jun 2024 17:42:10 +0200 Subject: [PATCH] Update --- .../java/com/bakdata/kafka/KafkaStreamsApplication.java | 4 +++- .../java/com/bakdata/kafka/StreamsExecutionOptions.java | 8 ++++---- .../src/main/java/com/bakdata/kafka/StreamsRunner.java | 4 +++- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 5424b4b5..0f37bdcd 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.StateListener; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -159,8 +160,9 @@ protected StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() { /** * Called after starting Kafka Streams * @param streams running {@code KafkaStreams} instance + * @param config config of {@code KafkaStreams} instance */ - protected void onStreamsStart(final KafkaStreams streams) { + protected void onStreamsStart(final KafkaStreams streams, final StreamsConfig config) { // do nothing by default } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java index 6d9c617c..0e93f14a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsExecutionOptions.java @@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.Map; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Supplier; import lombok.Builder; import lombok.NonNull; @@ -47,7 +47,7 @@ public class StreamsExecutionOptions { * Hook that is called after calling {@link KafkaStreams#start()} */ @Builder.Default - private final @NonNull Consumer onStart = streams -> {}; + private final @NonNull BiConsumer onStart = (streams, config) -> {}; /** * Configures {@link KafkaStreams#setStateListener(StateListener)} */ @@ -86,8 +86,8 @@ boolean shouldLeaveGroup(final Map originals) { return staticMembershipDisabled || this.volatileGroupInstanceId; } - void onStart(final KafkaStreams streams) { - this.onStart.accept(streams); + void onStart(final KafkaStreams streams, final StreamsConfig config) { + this.onStart.accept(streams, config); } StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java index 6b3d1adf..2b20cc8b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsRunner.java @@ -38,6 +38,7 @@ @Slf4j public final class StreamsRunner implements Runner { + private final @NonNull StreamsConfig config; private final @NonNull KafkaStreams streams; private final @NonNull CapturingStreamsUncaughtExceptionHandler exceptionHandler; private final @NonNull StreamsShutdownStateListener shutdownListener; @@ -61,6 +62,7 @@ public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsCon */ public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsConfig config, final @NonNull StreamsExecutionOptions options) { + this.config = config; this.streams = new KafkaStreams(topology, config); this.exceptionHandler = new CapturingStreamsUncaughtExceptionHandler(options.createUncaughtExceptionHandler()); this.streams.setUncaughtExceptionHandler(this.exceptionHandler); @@ -106,7 +108,7 @@ private void runStreams() { log.info("Starting Kafka Streams"); this.streams.start(); log.info("Calling start hook"); - this.executionOptions.onStart(this.streams); + this.executionOptions.onStart(this.streams, this.config); } private void awaitStreamsShutdown() {