Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jun 12, 2024
1 parent 761df8a commit b3b5ec1
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ protected StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() {

/**
* Called after starting Kafka Streams
* @param streams running {@code KafkaStreams} instance
* @param config config of {@code KafkaStreams} instance
* @param runningStreams running {@link KafkaStreams} instance along with its {@link StreamsConfig} and
* {@link org.apache.kafka.streams.Topology}
*/
protected void onStreamsStart(final KafkaStreams streams, final StreamsConfig config) {
protected void onStreamsStart(final RunningStreams runningStreams) {
// do nothing by default
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import com.bakdata.kafka.StreamsExecutionOptions.StreamsExecutionOptionsBuilder;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

/**
* A running {@link KafkaStreams} instance along with its {@link StreamsConfig} and
* {@link org.apache.kafka.streams.Topology}
*
* @see StreamsExecutionOptionsBuilder#onStart(Consumer)
*/
@Builder
@Value
public class RunningStreams {

@NonNull
StreamsConfig config;
@NonNull
Topology topology;
@NonNull
KafkaStreams streams;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Builder;
import lombok.NonNull;
Expand All @@ -47,7 +47,7 @@ public class StreamsExecutionOptions {
* Hook that is called after calling {@link KafkaStreams#start()}
*/
@Builder.Default
private final @NonNull BiConsumer<KafkaStreams, StreamsConfig> onStart = (streams, config) -> {};
private final @NonNull Consumer<RunningStreams> onStart = (runningStreams) -> {};
/**
* Configures {@link KafkaStreams#setStateListener(StateListener)}
*/
Expand Down Expand Up @@ -86,8 +86,8 @@ boolean shouldLeaveGroup(final Map<String, Object> originals) {
return staticMembershipDisabled || this.volatileGroupInstanceId;
}

void onStart(final KafkaStreams streams, final StreamsConfig config) {
this.onStart.accept(streams, config);
void onStart(final RunningStreams runningStreams) {
this.onStart.accept(runningStreams);
}

StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public final class StreamsRunner implements Runner {

private final @NonNull StreamsConfig config;
private final @NonNull Topology topology;
private final @NonNull KafkaStreams streams;
private final @NonNull CapturingStreamsUncaughtExceptionHandler exceptionHandler;
private final @NonNull StreamsShutdownStateListener shutdownListener;
Expand All @@ -63,6 +64,7 @@ public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsCon
public StreamsRunner(final @NonNull Topology topology, final @NonNull StreamsConfig config,
final @NonNull StreamsExecutionOptions options) {
this.config = config;
this.topology = topology;
this.streams = new KafkaStreams(topology, config);
this.exceptionHandler = new CapturingStreamsUncaughtExceptionHandler(options.createUncaughtExceptionHandler());
this.streams.setUncaughtExceptionHandler(this.exceptionHandler);
Expand Down Expand Up @@ -106,9 +108,15 @@ private boolean hasErrored() {

private void runStreams() {
log.info("Starting Kafka Streams");
log.debug("Streams topology:\n{}", this.topology.describe());
this.streams.start();
log.info("Calling start hook");
this.executionOptions.onStart(this.streams, this.config);
final RunningStreams runningStreams = RunningStreams.builder()
.streams(this.streams)
.config(this.config)
.topology(this.topology)
.build();
this.executionOptions.onStart(runningStreams);
}

private void awaitStreamsShutdown() {
Expand Down

0 comments on commit b3b5ec1

Please sign in to comment.