diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index cdc999e9..50a4a4ea 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -61,7 +61,7 @@ spec: - name: ENV_PREFIX value: {{ .Values.configurationEnvPrefix }}_ {{- range $key, $value := .Values.streams.config }} - - name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }} + - name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }} value: {{ $value | quote }} {{- end }} {{- if hasKey .Values.streams "brokers" }} diff --git a/charts/producer-app/README.md b/charts/producer-app/README.md index d6a4574b..d5dd1823 100644 --- a/charts/producer-app/README.md +++ b/charts/producer-app/README.md @@ -56,15 +56,15 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Other -| Parameter | Description | Default | -| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | -| `commandLine` | Map of command line arguments passed to the producer app. | `{}` | -| `env` | Custom environment variables | `{}` | -| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | -| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | -| `secretFilesRefs` | Mount existing secrets as volumes | `[]` | -| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` | +| Parameter | Description | Default | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | +| `commandLine` | Map of command line arguments passed to the producer app. | `{}` | +| `env` | Custom environment variables | `{}` | +| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `KAFKA_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `KAFKA_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | +| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | +| `secretFilesRefs` | Mount existing secrets as volumes | `[]` | +| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` | ### JVM diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index d697ee5b..50bc96e6 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -45,7 +45,7 @@ spec: - name: ENV_PREFIX value: {{ .Values.configurationEnvPrefix }}_ {{- range $key, $value := .Values.streams.config }} - - name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }} + - name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }} value: {{ $value | quote }} {{- end }} {{- if hasKey .Values.streams "brokers" }} diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index fd8ec5fa..931f2821 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -65,7 +65,7 @@ spec: - name: ENV_PREFIX value: {{ .Values.configurationEnvPrefix }}_ {{- range $key, $value := .Values.streams.config }} - - name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }} + - name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }} value: {{ $value | quote }} {{- end }} {{- if hasKey .Values.streams "brokers" }} diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index cd4c6795..5fcc3b47 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -67,15 +67,15 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Other -| Parameter | Description | Default | -| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | -| `commandLine` | Map of command line arguments passed to the streams app. | `{}` | -| `env` | Custom environment variables | `{}` | -| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | -| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | -| `secretFilesRefs` | Mount existing secrets as volumes | `[]` | -| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` | +| Parameter | Description | Default | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` | +| `commandLine` | Map of command line arguments passed to the streams app. | `{}` | +| `env` | Custom environment variables | `{}` | +| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `KAFKA_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `KAFKA_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` | +| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` | +| `secretFilesRefs` | Mount existing secrets as volumes | `[]` | +| `files` | Map of files to mount for the app. File will be mounted as `$value.mountPath/$key`. `$value.content` denotes file content (recommended to be used with `--set-file`). | `{}` | ### JMX Configuration diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 77e5ac7b..2c061ca2 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -88,7 +88,7 @@ spec: - name: ENV_PREFIX value: {{ .Values.configurationEnvPrefix }}_ {{- range $key, $value := .Values.streams.config }} - - name: {{ printf "STREAMS_%s" $key | replace "." "_" | upper | quote }} + - name: {{ printf "KAFKA_%s" $key | replace "." "_" | upper | quote }} value: {{ $value | quote }} {{- end }} {{- range .Values.ports }} @@ -97,12 +97,12 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - - name: STREAMS_APPLICATION_SERVER + - name: KAFKA_APPLICATION_SERVER value: "$(POD_IP):{{ .containerPort }}" {{- end }} {{- end }} {{- if .Values.streams.staticMembership }} - - name: STREAMS_GROUP_INSTANCE_ID + - name: KAFKA_GROUP_INSTANCE_ID valueFrom: fieldRef: fieldPath: metadata.name diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index b76bd3e1..0a00d3c1 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -125,8 +125,8 @@ public static int startApplicationWithoutExit(final KafkaApplication environmentArguments = new EnvironmentArgumentsParser(ENV_PREFIX) .parseVariables(System.getenv()); final Collection allArgs = new ArrayList<>(environmentArguments); diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 4bd028b1..ce3f20eb 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -18,7 +18,6 @@ dependencies { name = "slf4j-api", version = "2.0.9" ) // required because other dependencies use Slf4j 1.x which is not properly resolved if this library is used in test scope - implementation(group = "com.google.guava", name = "guava", version = "33.0.0-jre") implementation(group = "org.jooq", name = "jool", version = "0.9.14") val junitVersion: String by project diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index de2f409d..59983d2c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -87,7 +87,7 @@ private static Map createBaseConfig(final KafkaEndpointConfig en * *
  • * Configs provided via environment variables (see - * {@link EnvironmentStreamsConfigParser#parseVariables(Map)}) + * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) *
  • *
  • * Configs provided by {@link AppConfiguration#getKafkaConfig()} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index efe0a3d2..86bb3c2e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -92,7 +92,7 @@ private static Map createBaseConfig(final KafkaEndpointConfig en *
  • *
  • * Configs provided via environment variables (see - * {@link EnvironmentStreamsConfigParser#parseVariables(Map)}) + * {@link EnvironmentKafkaConfigParser#parseVariables(Map)}) *
  • *
  • * Configs provided by {@link AppConfiguration#getKafkaConfig()} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentStreamsConfigParser.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentKafkaConfigParser.java similarity index 85% rename from streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentStreamsConfigParser.java rename to streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentKafkaConfigParser.java index b2bd4f97..76221db3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentStreamsConfigParser.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/EnvironmentKafkaConfigParser.java @@ -31,20 +31,20 @@ import java.util.stream.Collectors; /** - * Parse configuration properties of a Kafka Streams app from environment variables + * Parse configuration properties of a Kafka app from environment variables */ -public final class EnvironmentStreamsConfigParser { +public final class EnvironmentKafkaConfigParser { - static final String PREFIX = "STREAMS_"; + static final String PREFIX = "KAFKA_"; private static final Pattern UNDERSCORE = Pattern.compile("_"); private static final Pattern PREFIX_PATTERN = Pattern.compile("^" + PREFIX); - private EnvironmentStreamsConfigParser() { + private EnvironmentKafkaConfigParser() { throw new UnsupportedOperationException("Utility class"); } /** - * Parse a list of environment variables as a streams configuration. All variables starting with {@code STREAMS_} + * Parse a list of environment variables as a streams configuration. All variables starting with {@code KAFKA_} * prefix are converted. {@code _} are replaced by {@code .} * * @param environment map of environment variables @@ -53,7 +53,7 @@ private EnvironmentStreamsConfigParser() { public static Map parseVariables(final Map environment) { return environment.entrySet().stream() .filter(e -> e.getKey().startsWith(PREFIX)) - .collect(Collectors.toMap(EnvironmentStreamsConfigParser::convertEnvironmentVariable, Entry::getValue)); + .collect(Collectors.toMap(EnvironmentKafkaConfigParser::convertEnvironmentVariable, Entry::getValue)); } private static String convertEnvironmentVariable(final Entry environmentEntry) { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java index 95b9a484..3fc6a130 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ImprovedStreamsConfig.java @@ -24,12 +24,16 @@ package com.bakdata.kafka; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; + import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import lombok.NonNull; import lombok.Value; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; /** * Class for simplified access to configs provided by {@link StreamsConfig} @@ -66,4 +70,21 @@ public List getBoostrapServers() { public Map getKafkaProperties() { return Collections.unmodifiableMap(this.streamsConfig.originals()); } + + /** + * Retrieves the host information based on the application server configuration. + * + * @return an {@code Optional} containing the {@link HostInfo} if the + * {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@code Optional}. + */ + public Optional getApplicationServer() { + final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG); + return applicationServerConfig.isEmpty() ? Optional.empty() + : Optional.of(createHostInfo(applicationServerConfig)); + } + + private static HostInfo createHostInfo(final String applicationServerConfig) { + final String[] hostAndPort = applicationServerConfig.split(":"); + return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java index aecd5dc2..02cf5c41 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KafkaPropertiesFactory.java @@ -40,7 +40,7 @@ class KafkaPropertiesFactory { Map createKafkaProperties(final Map configOverrides) { final Map kafkaConfig = new HashMap<>(this.baseConfig); kafkaConfig.putAll(this.app.createKafkaProperties()); - kafkaConfig.putAll(EnvironmentStreamsConfigParser.parseVariables(System.getenv())); + kafkaConfig.putAll(EnvironmentKafkaConfigParser.parseVariables(System.getenv())); kafkaConfig.putAll(this.configuration.getKafkaConfig()); kafkaConfig.putAll(this.endpointConfig.createKafkaProperties()); kafkaConfig.putAll(configOverrides); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java index 68314c09..57dbccb3 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredProducerAppTest.java @@ -61,8 +61,8 @@ void shouldPrioritizeConfigCLIParameters() { } @Test - @SetEnvironmentVariable(key = "STREAMS_FOO", value = "baz") - @SetEnvironmentVariable(key = "STREAMS_STREAMS", value = "streams") + @SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz") + @SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams") void shouldPrioritizeEnvironmentConfigs() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredProducerApp configuredApp = @@ -71,7 +71,7 @@ void shouldPrioritizeEnvironmentConfigs() { .brokers("fake") .build())) .containsEntry("foo", "baz") - .containsEntry("streams", "streams") + .containsEntry("kafka", "streams") .containsEntry("hello", "world"); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java index c9037bc4..b1b3c648 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConfiguredStreamsAppTest.java @@ -62,8 +62,8 @@ void shouldPrioritizeConfigCLIParameters() { } @Test - @SetEnvironmentVariable(key = "STREAMS_FOO", value = "baz") - @SetEnvironmentVariable(key = "STREAMS_STREAMS", value = "streams") + @SetEnvironmentVariable(key = "KAFKA_FOO", value = "baz") + @SetEnvironmentVariable(key = "KAFKA_KAFKA", value = "streams") void shouldPrioritizeEnvironmentConfigs() { final AppConfiguration configuration = newAppConfiguration(); final ConfiguredStreamsApp configuredApp = @@ -72,7 +72,7 @@ void shouldPrioritizeEnvironmentConfigs() { .brokers("fake") .build())) .containsEntry("foo", "baz") - .containsEntry("streams", "streams") + .containsEntry("kafka", "streams") .containsEntry("hello", "world"); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentStreamsConfigParserTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentKafkaConfigParserTest.java similarity index 82% rename from streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentStreamsConfigParserTest.java rename to streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentKafkaConfigParserTest.java index 78d2480e..0fdcad11 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentStreamsConfigParserTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/EnvironmentKafkaConfigParserTest.java @@ -29,13 +29,13 @@ import java.util.Map; import org.junit.jupiter.api.Test; -class EnvironmentStreamsConfigParserTest { +class EnvironmentKafkaConfigParserTest { @Test void shouldParseStreamsConfig() { - assertThat(EnvironmentStreamsConfigParser.parseVariables(Map.of( - "STREAMS_FOO", "bar", - "STREAMS_BAZ", "qux" + assertThat(EnvironmentKafkaConfigParser.parseVariables(Map.of( + "KAFKA_FOO", "bar", + "KAFKA_BAZ", "qux" ))) .hasSize(2) .containsEntry("foo", "bar") @@ -44,15 +44,15 @@ void shouldParseStreamsConfig() { @Test void shouldIgnoreVariablesWithoutPrefix() { - assertThat(EnvironmentStreamsConfigParser.parseVariables(Map.of( + assertThat(EnvironmentKafkaConfigParser.parseVariables(Map.of( "APP_FOO", "bar" ))).isEmpty(); } @Test void shouldConvertUnderscores() { - assertThat(EnvironmentStreamsConfigParser.parseVariables(Map.of( - "STREAMS_FOO_BAR", "baz" + assertThat(EnvironmentKafkaConfigParser.parseVariables(Map.of( + "KAFKA_FOO_BAR", "baz" ))) .hasSize(1) .containsEntry("foo.bar", "baz"); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java index 4c1f4738..90f24a8e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ImprovedStreamsConfigTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -95,4 +96,23 @@ void shouldGetOriginalKafkaProperties() { }); } + @Test + void shouldHaveHostInfoIfApplicationServiceIsConfigured() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092", + StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:9090")); + this.softly.assertThat(new ImprovedStreamsConfig(config).getApplicationServer()) + .hasValue(new HostInfo("localhost", 9090)); + } + + @Test + void shouldReturnEmptyHostInfoIfApplicationServiceIsNotConfigured() { + final StreamsConfig config = new StreamsConfig( + Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "test-app", + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); + this.softly.assertThat(new ImprovedStreamsConfig(config).getApplicationServer()) + .isNotPresent(); + } + }