Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default serialization config to apps #239

Merged
merged 15 commits into from
Jul 25, 2024
Merged
26 changes: 18 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyStreamsApplication(), args);
}

@Override
Expand All @@ -86,6 +88,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down Expand Up @@ -125,8 +132,6 @@ The following configuration options are available:

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
Expand All @@ -144,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyProducerApplication(), args);
}

@Override
Expand All @@ -164,6 +171,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication {
};
}

@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand All @@ -188,8 +200,6 @@ The following configuration options are available:

- `--extra-output-topics`: Additional named output topics (`String=String>[,<String=String>...]`)

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Delete all output topics associated with the Kafka Producer application.
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if hasKey .Values.streams "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
Expand Down
2 changes: 0 additions & 2 deletions charts/producer-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
1 change: 0 additions & 1 deletion charts/producer-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` |
| `commandLine` | Map of command line arguments passed to the producer app. | `{}` |
| `debug` | Configure logging to debug | `false` |
| `env` | Custom environment variables | `{}` |
| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` |
| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` |
Expand Down
4 changes: 0 additions & 4 deletions charts/producer-app/templates/pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if hasKey .Values.streams "outputTopic" }}
- name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC"
value: {{ .Values.streams.outputTopic | quote }}
Expand Down
2 changes: 0 additions & 2 deletions charts/producer-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app-cleanup-job/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "inputTopics") (.Values.streams.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.streams.inputTopics | join "," | quote }}
Expand Down
2 changes: 0 additions & 2 deletions charts/streams-app-cleanup-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar
#
Expand Down
1 change: 0 additions & 1 deletion charts/streams-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ Alternatively, a YAML file that specifies the values for the parameters can be p
| ------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `configurationEnvPrefix` | Prefix for environment variables to use that should be parsed as command line arguments. | `APP` |
| `commandLine` | Map of command line arguments passed to the streams app. | `{}` |
| `debug` | Configure logging to debug | `false` |
| `env` | Custom environment variables | `{}` |
| `secrets` | Custom secret environment variables. Prefix with `configurationEnvPrefix` in order to pass secrets to command line or prefix with `STREAMS_` to pass secrets to Kafka Streams configuration. E.g., `APP_MY_PARAM` would be passed as `--my-param` and `STREAMS_MAX_POLL_TIMEOUT_MS` would be translated to `max.poll.timeout.ms`. | `{}` |
| `secretRefs` | Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret `name` and `key`. | `{}` |
Expand Down
4 changes: 0 additions & 4 deletions charts/streams-app/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ spec:
- name: "{{ .Values.configurationEnvPrefix }}_SCHEMA_REGISTRY_URL"
value: {{ .Values.streams.schemaRegistryUrl | quote }}
{{- end }}
{{- if hasKey .Values "debug" }}
- name: "{{ .Values.configurationEnvPrefix }}_DEBUG"
value: {{ .Values.debug | quote }}
{{- end }}
{{- if and (hasKey .Values.streams "inputTopics") (.Values.streams.inputTopics) }}
- name: "{{ .Values.configurationEnvPrefix }}_INPUT_TOPICS"
value: {{ .Values.streams.inputTopics | join "," | quote }}
Expand Down
2 changes: 0 additions & 2 deletions charts/streams-app/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ streams:
commandLine: {}
# MY_CLI_PARAM: "foo-bar"

debug: false

env: {}
# MY_ENV_VARIABLE: foo-bar

Expand Down
5 changes: 2 additions & 3 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ plugins {
dependencies {
api(project(":streams-bootstrap-core"))
api(group = "info.picocli", name = "picocli", version = "4.7.5")
val log4jVersion: String by project
implementation(group = "org.apache.logging.log4j", name = "log4j-core", version = log4jVersion)
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Expand All @@ -31,4 +28,6 @@ dependencies {
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParseResult;
Expand All @@ -56,7 +54,6 @@
* <li>{@link #outputTopic}</li>
* <li>{@link #extraOutputTopics}</li>
* <li>{@link #brokers}</li>
* <li>{@link #debug}</li>
* <li>{@link #schemaRegistryUrl}</li>
* <li>{@link #kafkaConfig}</li>
* </ul>
Expand Down Expand Up @@ -92,8 +89,6 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
private Map<String, String> extraOutputTopics = emptyMap();
@CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to")
private String brokers;
@CommandLine.Option(names = "--debug", arity = "0..1", description = "Configure logging to debug")
private boolean debug;
@CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry")
private String schemaRegistryUrl;
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
Expand Down Expand Up @@ -299,20 +294,16 @@ public final CleanableApp<CR> createCleanableApp() {
protected abstract CA createConfiguredApp(final A app, AppConfiguration<T> configuration);

/**
* Configure application when running in debug mode. By default, Log4j2 log level is configured to debug for
* {@code com.bakdata} and the applications package.
* Called before starting the application, e.g., invoking {@link #run()}
*/
protected void configureDebug() {
Configurator.setLevel("com.bakdata", Level.DEBUG);
Configurator.setLevel(this.getClass().getPackageName(), Level.DEBUG);
protected void onApplicationStart() {
// do nothing by default
}

private void startApplication() {
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
this.onApplicationStart();
log.info("Starting application");
if (this.debug) {
this.configureDebug();
}
log.debug("Starting application: {}", this);
}

Expand Down
41 changes: 41 additions & 0 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;

Expand All @@ -63,6 +64,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -91,6 +97,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
}), new String[]{
"--brokers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
Expand All @@ -115,6 +126,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -147,6 +163,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -179,6 +200,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -210,6 +236,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -249,6 +280,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}
}, new String[]{
Expand All @@ -275,6 +311,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
Expand Down Expand Up @@ -57,6 +58,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
return CloseFlagApp.this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

@Override
public void close() {
CloseFlagApp.this.appClosed = true;
Expand Down
Loading
Loading