Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 5, 2024
1 parent 3597a20 commit 461152b
Show file tree
Hide file tree
Showing 9 changed files with 793 additions and 711 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -114,10 +115,19 @@ public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpoint
kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv()));
kafkaConfig.putAll(this.configuration.getKafkaConfig());
kafkaConfig.putAll(endpointConfig.createKafkaProperties());
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.app.getUniqueAppId(this.getTopics()));
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getUniqueAppId());
return Collections.unmodifiableMap(kafkaConfig);
}

/**
* Get unique application identifier of {@code StreamsApp}
* @return unique application identifier
* @see StreamsApp#getUniqueAppId(StreamsTopicConfig)
*/
public String getUniqueAppId() {
return Objects.requireNonNull(this.app.getUniqueAppId(this.getTopics()));
}

/**
* Get topic configuration
* @return topic configuration
Expand All @@ -136,7 +146,7 @@ public ExecutableStreamsApp<T> withEndpoint(final KafkaEndpointConfig endpointCo
final Topology topology = this.createTopology(kafkaProperties);
return ExecutableStreamsApp.<T>builder()
.topology(topology)
.streamsConfig(new StreamsConfig(kafkaProperties))
.config(new StreamsConfig(kafkaProperties))
.app(this.app)
.setup(() -> this.setupApp(kafkaProperties))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ExecutableStreamsApp<T extends StreamsApp> implements AutoCloseable
@Getter
private final @NonNull Topology topology;
@Getter
private final @NonNull StreamsConfig streamsConfig;
private final @NonNull StreamsConfig config;
@Getter
private final @NonNull T app;
@Builder.Default
Expand All @@ -54,7 +54,7 @@ public class ExecutableStreamsApp<T extends StreamsApp> implements AutoCloseable
*/
public StreamsCleanUpRunner createCleanUpRunner() {
final StreamsCleanUpConfiguration configurer = this.app.setupCleanUp();
return StreamsCleanUpRunner.create(this.topology, this.streamsConfig, configurer);
return StreamsCleanUpRunner.create(this.topology, this.config, configurer);
}

/**
Expand All @@ -63,7 +63,7 @@ public StreamsCleanUpRunner createCleanUpRunner() {
* @see StreamsRunner#StreamsRunner(Topology, StreamsConfig)
*/
public StreamsRunner createRunner() {
return new StreamsRunner(this.topology, this.streamsConfig);
return new StreamsRunner(this.topology, this.config);
}

/**
Expand All @@ -74,7 +74,7 @@ public StreamsRunner createRunner() {
*/
public StreamsRunner createRunner(final StreamsExecutionOptions executionOptions) {
this.setup.run();
return new StreamsRunner(this.topology, this.streamsConfig, executionOptions);
return new StreamsRunner(this.topology, this.config, executionOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class KafkaEndpointConfig {
* </ul>
* @return properties used for connecting to Kafka
*/
public Map<String, String> createKafkaProperties() {
public Map<String, Object> createKafkaProperties() {
final Map<String, String> kafkaConfig = new HashMap<>();
kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokers);
if (this.isSchemaRegistryConfigured()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.Builder;
Expand Down Expand Up @@ -70,18 +71,18 @@ public class StreamsExecutionOptions {
@Builder.Default
private final Duration closeTimeout = Duration.ofMillis(Long.MAX_VALUE);

private static boolean isStaticMembershipDisabled(final StreamsConfig config) {
return config.originals().get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null;
private static boolean isStaticMembershipDisabled(final Map<String, Object> originals) {
return originals.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null;
}

CloseOptions createCloseOptions(final StreamsConfig config) {
final boolean leaveGroup = this.shouldLeaveGroup(config);
final boolean leaveGroup = this.shouldLeaveGroup(config.originals());
return new CloseOptions().leaveGroup(leaveGroup).timeout(this.closeTimeout);
}

@VisibleForTesting
boolean shouldLeaveGroup(final StreamsConfig config) {
final boolean staticMembershipDisabled = isStaticMembershipDisabled(config);
boolean shouldLeaveGroup(final Map<String, Object> originals) {
final boolean staticMembershipDisabled = isStaticMembershipDisabled(originals);
return staticMembershipDisabled || this.volatileGroupInstanceId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

class ConfiguredProducerAppTest {

Expand All @@ -52,6 +53,25 @@ void shouldPrioritizeConfigCLIParameters() {
.containsEntry("hello", "world");
}

@Test
@SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz")
@SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams")
void shouldPrioritizeEnvironmentConfigs() {
final ConfiguredProducerApp<ProducerApp> configuredApp =
new ConfiguredProducerApp<>(new TestProducer(), ProducerAppConfiguration.builder()
.kafkaConfig(Map.of(
"foo", "baz",
"kafka", "streams"
))
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
.build()))
.containsEntry("foo", "baz")
.containsEntry("kafka", "streams")
.containsEntry("hello", "world");
}

@Test
void shouldSetDefaultAvroSerializerWhenSchemaRegistryUrlIsSet() {
final ConfiguredProducerApp<ProducerApp> configuredApp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;

class ConfiguredStreamsAppTest {

Expand All @@ -53,6 +54,25 @@ void shouldPrioritizeConfigCLIParameters() {
.containsEntry("hello", "world");
}

@Test
@SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz")
@SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams")
void shouldPrioritizeEnvironmentConfigs() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(), StreamsAppConfiguration.builder()
.kafkaConfig(Map.of(
"foo", "baz",
"kafka", "streams"
))
.build());
assertThat(configuredApp.getKafkaProperties(KafkaEndpointConfig.builder()
.brokers("fake")
.build()))
.containsEntry("foo", "baz")
.containsEntry("kafka", "streams")
.containsEntry("hello", "world");
}

@Test
void shouldSetDefaultAvroSerdeWhenSchemaRegistryUrlIsSet() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;

public class StreamsExecutionOptionsTest {
Expand All @@ -38,26 +37,26 @@ public class StreamsExecutionOptionsTest {
void shouldLeaveGroup() {
final StreamsExecutionOptions options = StreamsExecutionOptions.builder()
.build();
assertThat(options.shouldLeaveGroup(new StreamsConfig(emptyMap()))).isTrue();
assertThat(options.shouldLeaveGroup(emptyMap())).isTrue();
}

@Test
void shouldNotLeaveGroup() {
final StreamsExecutionOptions options = StreamsExecutionOptions.builder()
.volatileGroupInstanceId(false)
.build();
assertThat(options.shouldLeaveGroup(new StreamsConfig(Map.of(
assertThat(options.shouldLeaveGroup(Map.of(
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "foo"
)))).isFalse();
))).isFalse();
}

@Test
void shouldLeaveGroupWithVolatileGroupId() {
final StreamsExecutionOptions options = StreamsExecutionOptions.builder()
.volatileGroupInstanceId(true)
.build();
assertThat(options.shouldLeaveGroup(new StreamsConfig(Map.of(
assertThat(options.shouldLeaveGroup(Map.of(
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "foo"
)))).isTrue();
))).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static net.mguenther.kafka.junit.Wait.delay;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -37,7 +36,6 @@
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsAppConfiguration;
import com.bakdata.kafka.StreamsAppConfiguration.StreamsAppConfigurationBuilder;
import com.bakdata.kafka.StreamsExecutionOptions;
import com.bakdata.kafka.StreamsRunner;
import com.bakdata.kafka.StreamsTopicConfig;
Expand Down Expand Up @@ -65,34 +63,46 @@
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.kstream.KStream;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(SoftAssertionsExtension.class)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class RunStreamsAppTest {
private static final int TIMEOUT_SECONDS = 10;
private EmbeddedKafkaCluster kafkaCluster;
@Mock
private StreamsUncaughtExceptionHandler uncaughtExceptionHandler;
@Mock
private StateListener stateListener;
@InjectSoftAssertions
private SoftAssertions softly;

private static StreamsAppConfigurationBuilder newConfiguration() {
return StreamsAppConfiguration.builder()
.kafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
}

private static void run(final StreamsRunner runner) {
static void run(final StreamsRunner runner) {
// run in Thread because the application blocks indefinitely
new Thread(runner::run).start();
}

static ConfiguredStreamsApp<StreamsApp> configureApp(final StreamsApp app, final StreamsTopicConfig topics) {
final StreamsAppConfiguration configuration = StreamsAppConfiguration.builder()
.kafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
))
.topics(topics)
.build();
return new ConfiguredStreamsApp<>(app, configuration);
}

@BeforeEach
void setup() {
this.kafkaCluster = provisionWith(defaultClusterConfig());
Expand Down Expand Up @@ -125,7 +135,7 @@ void shouldRunApp() throws InterruptedException {
.build();
this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder);
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class)
this.softly.assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build()))
Expand Down Expand Up @@ -159,7 +169,7 @@ void shouldUseMultipleExtraInputTopics() throws InterruptedException {
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build());
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class)
this.softly.assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build()))
Expand Down Expand Up @@ -187,8 +197,8 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException {
thread.setUncaughtExceptionHandler(handler);
thread.start();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(thread.isAlive()).isFalse();
assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class);
this.softly.assertThat(thread.isAlive()).isFalse();
this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class);
verify(this.uncaughtExceptionHandler).handle(any());
verify(this.stateListener).onChange(State.ERROR, State.PENDING_ERROR);
}
Expand Down Expand Up @@ -221,20 +231,17 @@ void shouldCloseOnMapError() throws InterruptedException {
.build();
this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder);
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(thread.isAlive()).isFalse();
assertThat(handler.getLastException()).isInstanceOf(StreamsException.class)
.satisfies(e -> assertThat(e.getCause()).hasMessage("Error in map"));
this.softly.assertThat(thread.isAlive()).isFalse();
this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class)
.satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map"));
verify(this.uncaughtExceptionHandler).handle(any());
verify(this.stateListener).onChange(State.ERROR, State.PENDING_ERROR);
}
}

private ExecutableStreamsApp<StreamsApp> setupApp(final StreamsApp app, final StreamsTopicConfig topics) {
final StreamsAppConfiguration configuration = newConfiguration()
.topics(topics)
.build();
final ConfiguredStreamsApp<StreamsApp> appConfiguredStreamsApp = new ConfiguredStreamsApp<>(app, configuration);
return appConfiguredStreamsApp.withEndpoint(KafkaEndpointConfig.builder()
final ConfiguredStreamsApp<StreamsApp> configuredApp = configureApp(app, topics);
return configuredApp.withEndpoint(KafkaEndpointConfig.builder()
.brokers(this.kafkaCluster.getBrokerList())
.build());
}
Expand Down
Loading

0 comments on commit 461152b

Please sign in to comment.