From 7c66ffe72219ca3b79af8f4a56d4d909f39b685e Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 26 Jul 2024 13:48:09 +0200 Subject: [PATCH] Rename to labeled --- README.md | 10 +++-- .../templates/job.yaml | 6 +-- charts/producer-app-cleanup-job/values.yaml | 4 +- charts/producer-app/README.md | 14 +++---- charts/producer-app/templates/pod.yaml | 6 +-- charts/producer-app/values.yaml | 4 +- .../templates/job.yaml | 18 ++++---- charts/streams-app-cleanup-job/values.yaml | 12 +++--- charts/streams-app/README.md | 26 ++++++------ charts/streams-app/templates/deployment.yaml | 18 ++++---- .../streams-app/templates/scaled-object.yaml | 6 +-- charts/streams-app/values.yaml | 14 +++---- .../com/bakdata/kafka/KafkaApplication.java | 7 ++-- .../kafka/KafkaProducerApplication.java | 2 +- .../kafka/KafkaStreamsApplication.java | 19 +++++---- .../test/java/com/bakdata/kafka/CliTest.java | 24 +++++------ .../bakdata/kafka/ProducerCleanUpRunner.java | 2 +- .../bakdata/kafka/ProducerTopicConfig.java | 14 +++---- .../com/bakdata/kafka/StreamsTopicConfig.java | 42 +++++++++---------- .../com/bakdata/kafka/TopologyBuilder.java | 24 +++++------ .../kafka/integration/StreamsRunnerTest.java | 14 +++---- ...putTopics.java => LabeledInputTopics.java} | 4 +- 22 files changed, 147 insertions(+), 143 deletions(-) rename streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/{NamedInputTopics.java => LabeledInputTopics.java} (96%) diff --git a/README.md b/README.md index 89e55c28..046eb00f 100644 --- a/README.md +++ b/README.md @@ -121,13 +121,15 @@ The following configuration options are available: - `--error-topic`: A topic to write errors to -- `--named-input-topics`: Additional named input topics if you need to specify multiple topics with different message +- `--labeled-input-topics`: Additional labeled input topics if you need to specify multiple topics with different + message types (`[,...]`) -- `--named-input-patterns`: Additional named input patterns if you need to specify multiple topics with different +- `--labeled-input-patterns`: Additional labeled input patterns if you need to specify multiple topics with different message types (`[,...]`) -- `--named-output-topics`: Additional named output topics if you need to specify multiple topics with different message +- `--labeled-output-topics`: Additional labeled output topics if you need to specify multiple topics with different + message types (`String=String>[,...]`) - `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown. @@ -198,7 +200,7 @@ The following configuration options are available: - `--output-topic`: The output topic -- `--named-output-topics`: Additional named output topics (`String=String>[,...]`) +- `--labeled-output-topics`: Additional labeled output topics (`String=String>[,...]`) Additionally, the following commands are available: diff --git a/charts/producer-app-cleanup-job/templates/job.yaml b/charts/producer-app-cleanup-job/templates/job.yaml index d2227e14..acf0ac5a 100644 --- a/charts/producer-app-cleanup-job/templates/job.yaml +++ b/charts/producer-app-cleanup-job/templates/job.yaml @@ -76,9 +76,9 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.kafka.outputTopic | quote }} {{- end }} - {{- if and (hasKey .Values.kafka "namedOutputTopics") (.Values.kafka.namedOutputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_OUTPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedOutputTopics }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledOutputTopics") (.Values.kafka.labeledOutputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_OUTPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledOutputTopics }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" diff --git a/charts/producer-app-cleanup-job/values.yaml b/charts/producer-app-cleanup-job/values.yaml index 86baf859..35322cc9 100644 --- a/charts/producer-app-cleanup-job/values.yaml +++ b/charts/producer-app-cleanup-job/values.yaml @@ -23,8 +23,8 @@ kafka: # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. # max.request.size: "1000000" # outputTopic: output - namedOutputTopics: {} -# role: output + labeledOutputTopics: {} +# label: output commandLine: {} # MY_CLI_PARAM: "foo-bar" diff --git a/charts/producer-app/README.md b/charts/producer-app/README.md index 74697b33..3da0ffda 100644 --- a/charts/producer-app/README.md +++ b/charts/producer-app/README.md @@ -46,13 +46,13 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Streams -| Parameter | Description | Default | -|---------------------------|------------------------------------------------------------------------------------------------------------|---------| -| `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | -| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | -| `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` | -| `kafka.outputTopic` | Output topic for your producer application. | | -| `kafka.namedOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` | +| Parameter | Description | Default | +|-----------------------------|--------------------------------------------------------------------------------------------------------------|---------| +| `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | +| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | +| `kafka.config` | Configurations for your [Kafka producer app](https://kafka.apache.org/documentation/#producerconfigs). | `{}` | +| `kafka.outputTopic` | Output topic for your producer application. | | +| `kafka.labeledOutputTopics` | Map of additional labeled output topics if you need to specify multiple topics with different message types. | `{}` | ### Other diff --git a/charts/producer-app/templates/pod.yaml b/charts/producer-app/templates/pod.yaml index 3d8aa433..2ce6e953 100644 --- a/charts/producer-app/templates/pod.yaml +++ b/charts/producer-app/templates/pod.yaml @@ -60,9 +60,9 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_OUTPUT_TOPIC" value: {{ .Values.kafka.outputTopic | quote }} {{- end }} - {{- if and (hasKey .Values.kafka "namedOutputTopics") (.Values.kafka.namedOutputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_OUTPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedOutputTopics }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledOutputTopics") (.Values.kafka.labeledOutputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_OUTPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledOutputTopics }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" diff --git a/charts/producer-app/values.yaml b/charts/producer-app/values.yaml index f98b322f..d3624d67 100644 --- a/charts/producer-app/values.yaml +++ b/charts/producer-app/values.yaml @@ -56,8 +56,8 @@ kafka: # Note that YAML may convert large integers to scientific notation. Use Strings to avoid this. # max.request.size: "1000000" # outputTopic: output - namedOutputTopics: {} -# role: output + labeledOutputTopics: {} +# label: output commandLine: {} # MY_CLI_PARAM: "foo-bar" diff --git a/charts/streams-app-cleanup-job/templates/job.yaml b/charts/streams-app-cleanup-job/templates/job.yaml index 0e7a389f..6c279091 100644 --- a/charts/streams-app-cleanup-job/templates/job.yaml +++ b/charts/streams-app-cleanup-job/templates/job.yaml @@ -92,18 +92,18 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_ERROR_TOPIC" value: {{ .Values.kafka.errorTopic | quote }} {{- end }} - {{- if and (hasKey .Values.kafka "namedOutputTopics") (.Values.kafka.namedOutputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_OUTPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedOutputTopics }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledOutputTopics") (.Values.kafka.labeledOutputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_OUTPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledOutputTopics }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- $delimiter := ";" }} - {{- if and (hasKey .Values.kafka "namedInputTopics") (.Values.kafka.namedInputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_INPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedInputTopics }}{{ $key }}={{ $value | join $delimiter }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledInputTopics") (.Values.kafka.labeledInputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledInputTopics }}{{ $key }}={{ $value | join $delimiter }},{{- end }}" {{- end }} - {{- if and (hasKey .Values.kafka "namedInputPatterns") (.Values.kafka.namedInputPatterns) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_INPUT_PATTERNS" - value: "{{- range $key, $value := .Values.kafka.namedInputPatterns }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledInputPatterns") (.Values.kafka.labeledInputPatterns) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_PATTERNS" + value: "{{- range $key, $value := .Values.kafka.labeledInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" diff --git a/charts/streams-app-cleanup-job/values.yaml b/charts/streams-app-cleanup-job/values.yaml index 199e3441..5bc52d1e 100644 --- a/charts/streams-app-cleanup-job/values.yaml +++ b/charts/streams-app-cleanup-job/values.yaml @@ -25,16 +25,16 @@ kafka: inputTopics: [] # - input # - input2 - namedInputTopics: {} -# role: + labeledInputTopics: {} + # label: # - input # - input2 # inputPattern: .*-input - namedInputPatterns: {} -# role: .*-input + labeledInputPatterns: {} + # label: .*-input # outputTopic: output - namedOutputTopics: {} -# role: output + labeledOutputTopics: {} + # label: output # errorTopic: error deleteOutput: false diff --git a/charts/streams-app/README.md b/charts/streams-app/README.md index 0d0f5ccf..01513ac6 100644 --- a/charts/streams-app/README.md +++ b/charts/streams-app/README.md @@ -50,19 +50,19 @@ Alternatively, a YAML file that specifies the values for the parameters can be p ### Streams -| Parameter | Description | Default | -|----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | -| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | -| `kafka.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` | -| `kafka.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` | -| `kafka.inputTopics` | List of input topics for your streams application. | `[]` | -| `kafka.namedInputTopics` | Map of additional named input topics if you need to specify multiple topics with different message types. | `{}` | -| `kafka.inputPattern` | Input pattern of topics for your streams application. | | -| `kafka.namedInputPatterns` | Map of additional named input patterns if you need to specify multiple topics with different message types. | `{}` | -| `kafka.outputTopic` | Output topic for your streams application. | | -| `kafka.namedOutputTopics` | Map of additional named output topics if you need to specify multiple topics with different message types. | `{}` | -| `kafka.errorTopic` | Error topic for your streams application. | | +| Parameter | Description | Default | +|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| `kafka.brokers` | Comma separated list of Kafka brokers to connect to. | | +| `kafka.schemaRegistryUrl` | URL of Schema Registry to connect to. | `null` | +| `kafka.staticMembership` | Whether to use [Kafka Static Group Membership](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances). | `false` | +| `kafka.config` | Configurations for your [Kafka Streams app](https://kafka.apache.org/documentation/#streamsconfigs). | `{}` | +| `kafka.inputTopics` | List of input topics for your streams application. | `[]` | +| `kafka.labeledInputTopics` | Map of additional labeled input topics if you need to specify multiple topics with different message types. | `{}` | +| `kafka.inputPattern` | Input pattern of topics for your streams application. | | +| `kafka.labeledInputPatterns` | Map of additional labeled input patterns if you need to specify multiple topics with different message types. | `{}` | +| `kafka.outputTopic` | Output topic for your streams application. | | +| `kafka.labeledOutputTopics` | Map of additional labeled output topics if you need to specify multiple topics with different message types. | `{}` | +| `kafka.errorTopic` | Error topic for your streams application. | | ### Other diff --git a/charts/streams-app/templates/deployment.yaml b/charts/streams-app/templates/deployment.yaml index 7b8d987c..2bec40ae 100644 --- a/charts/streams-app/templates/deployment.yaml +++ b/charts/streams-app/templates/deployment.yaml @@ -139,18 +139,18 @@ spec: - name: "{{ .Values.configurationEnvPrefix }}_ERROR_TOPIC" value: {{ .Values.kafka.errorTopic | quote }} {{- end }} - {{- if and (hasKey .Values.kafka "namedOutputTopics") (.Values.kafka.namedOutputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_OUTPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedOutputTopics }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledOutputTopics") (.Values.kafka.labeledOutputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_OUTPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledOutputTopics }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- $delimiter := ";" }} - {{- if and (hasKey .Values.kafka "namedInputTopics") (.Values.kafka.namedInputTopics) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_INPUT_TOPICS" - value: "{{- range $key, $value := .Values.kafka.namedInputTopics }}{{ $key }}={{ $value | join $delimiter }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledInputTopics") (.Values.kafka.labeledInputTopics) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_TOPICS" + value: "{{- range $key, $value := .Values.kafka.labeledInputTopics }}{{ $key }}={{ $value | join $delimiter }},{{- end }}" {{- end }} - {{- if and (hasKey .Values.kafka "namedInputPatterns") (.Values.kafka.namedInputPatterns) }} - - name: "{{ .Values.configurationEnvPrefix }}_NAMED_INPUT_PATTERNS" - value: "{{- range $key, $value := .Values.kafka.namedInputPatterns }}{{ $key }}={{ $value }},{{- end }}" + {{- if and (hasKey .Values.kafka "labeledInputPatterns") (.Values.kafka.labeledInputPatterns) }} + - name: "{{ .Values.configurationEnvPrefix }}_LABELED_INPUT_PATTERNS" + value: "{{- range $key, $value := .Values.kafka.labeledInputPatterns }}{{ $key }}={{ $value }},{{- end }}" {{- end }} {{- range $key, $value := .Values.secrets }} - name: "{{ $key }}" diff --git a/charts/streams-app/templates/scaled-object.yaml b/charts/streams-app/templates/scaled-object.yaml index a4f3fc1e..9810ddb0 100644 --- a/charts/streams-app/templates/scaled-object.yaml +++ b/charts/streams-app/templates/scaled-object.yaml @@ -26,8 +26,8 @@ spec: idleReplicaCount: {{ .Values.autoscaling.idleReplicas }} {{- end }} triggers: - {{- if not (or .Values.kafka.inputTopics .Values.autoscaling.internalTopics .Values.autoscaling.topics .Values.kafka.namedInputTopics .Values.autoscaling.additionalTriggers) }} - {{- fail "To use autoscaling, you must define one of .Values.kafka.inputTopics, .Values.autoscaling.internalTopics, .Values.autoscaling.topics, .Values.kafka.namedInputTopics or .Values.autoscaling.additionalTriggers" }} + {{- if not (or .Values.kafka.inputTopics .Values.autoscaling.internalTopics .Values.autoscaling.topics .Values.kafka.labeledInputTopics .Values.autoscaling.additionalTriggers) }} + {{- fail "To use autoscaling, you must define one of .Values.kafka.inputTopics, .Values.autoscaling.internalTopics, .Values.autoscaling.topics, .Values.kafka.labeledInputTopics or .Values.autoscaling.additionalTriggers" }} {{- end}} # todo: concat .Values.kafka.inputTopics and .Values.autoscaling.topics to # minimize number of loops when we don't need to support helm 2 anymore @@ -58,7 +58,7 @@ spec: lagThreshold: {{ $root.Values.autoscaling.lagThreshold | quote }} offsetResetPolicy: {{ $root.Values.autoscaling.offsetResetPolicy }} {{- end }} - {{- range $key, $value := .Values.kafka.namedInputTopics }} + {{- range $key, $value := .Values.kafka.labeledInputTopics }} {{- range $topic := $value }} - type: kafka metadata: diff --git a/charts/streams-app/values.yaml b/charts/streams-app/values.yaml index a099ca1d..92695bf0 100644 --- a/charts/streams-app/values.yaml +++ b/charts/streams-app/values.yaml @@ -34,16 +34,16 @@ kafka: inputTopics: [] # - input # - input2 - namedInputTopics: {} - # role: + labeledInputTopics: {} + # label: # - input # - input2 # inputPattern: .*-input - namedInputPatterns: {} - # role: .*-input + labeledInputPatterns: {} + # label: .*-input # outputTopic: output - namedOutputTopics: {} - # role: output + labeledOutputTopics: {} + # label: output # errorTopic: error commandLine: {} @@ -105,7 +105,7 @@ autoscaling: minReplicas: 0 maxReplicas: 1 # idleReplicas: 0 - ## all topics from kafka.inputTopics and kafka.namedInputTopics are automatically taken + ## all topics from kafka.inputTopics and kafka.labeledInputTopics are automatically taken ## only use the 'internalTopics' option for adding internal topics, i.e., auto-generated topics by Kafka Streams. Consumer group name will automatically be added as a prefix internalTopics: [] # - bar-repartition # results in foo-bar-repartition diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 45cc6cf1..c6241b3f 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -52,7 +52,7 @@ *
    *
  • {@link #brokers}
  • *
  • {@link #outputTopic}
  • - *
  • {@link #namedOutputTopics}
  • + *
  • {@link #labeledOutputTopics}
  • *
  • {@link #brokers}
  • *
  • {@link #schemaRegistryUrl}
  • *
  • {@link #kafkaConfig}
  • @@ -85,8 +85,9 @@ public abstract class KafkaApplication activeApps = new ConcurrentLinkedDeque<>(); @CommandLine.Option(names = "--output-topic", description = "Output topic") private String outputTopic; - @CommandLine.Option(names = "--named-output-topics", split = ",", description = "Additional named output topics") - private Map namedOutputTopics = emptyMap(); + @CommandLine.Option(names = "--labeled-output-topics", split = ",", + description = "Additional labeled output topics") + private Map labeledOutputTopics = emptyMap(); @CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to") private String brokers; @CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry") diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index 06d7fde2..87b5a53d 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -68,7 +68,7 @@ public final Optional createExecutionOptions() { public final ProducerTopicConfig createTopicConfig() { return ProducerTopicConfig.builder() .outputTopic(this.getOutputTopic()) - .namedOutputTopics(this.getNamedOutputTopics()) + .labeledOutputTopics(this.getLabeledOutputTopics()) .build(); } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index eba6c373..95b6a679 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -52,8 +52,8 @@ *
  • {@link #inputTopics}
  • *
  • {@link #inputPattern}
  • *
  • {@link #errorTopic}
  • - *
  • {@link #namedInputTopics}
  • - *
  • {@link #namedInputPatterns}
  • + *
  • {@link #labeledInputTopics}
  • + *
  • {@link #labeledInputPatterns}
  • *
  • {@link #volatileGroupInstanceId}
  • *
* To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling @@ -74,11 +74,12 @@ public abstract class KafkaStreamsApplication extends private Pattern inputPattern; @CommandLine.Option(names = "--error-topic", description = "Error topic") private String errorTopic; - @CommandLine.Option(names = "--named-input-topics", split = ",", description = "Additional named input topics", + @CommandLine.Option(names = "--labeled-input-topics", split = ",", description = "Additional labeled input topics", converter = {UseDefaultConverter.class, StringListConverter.class}) - private Map> namedInputTopics = emptyMap(); - @CommandLine.Option(names = "--named-input-patterns", split = ",", description = "Additional named input patterns") - private Map namedInputPatterns = emptyMap(); + private Map> labeledInputTopics = emptyMap(); + @CommandLine.Option(names = "--labeled-input-patterns", split = ",", + description = "Additional labeled input patterns") + private Map labeledInputPatterns = emptyMap(); @CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1", description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.") private boolean volatileGroupInstanceId; @@ -122,11 +123,11 @@ public final Optional createExecutionOptions() { public final StreamsTopicConfig createTopicConfig() { return StreamsTopicConfig.builder() .inputTopics(this.inputTopics) - .namedInputTopics(this.namedInputTopics) + .labeledInputTopics(this.labeledInputTopics) .inputPattern(this.inputPattern) - .namedInputPatterns(this.namedInputPatterns) + .labeledInputPatterns(this.labeledInputPatterns) .outputTopic(this.getOutputTopic()) - .namedOutputTopics(this.getNamedOutputTopics()) + .labeledOutputTopics(this.getLabeledOutputTopics()) .errorTopic(this.errorTopic) .build(); } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index c57eb7d1..54dfacd2 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -328,31 +328,31 @@ public void run() { "--brokers", "brokers", "--schema-registry-url", "schema-registry", "--input-topics", "input1,input2", - "--named-input-topics", "role1=input3,role2=input4;input5", + "--labeled-input-topics", "label1=input3,label2=input4;input5", "--input-pattern", ".*", - "--named-input-patterns", "role1=.+,role2=\\d+", + "--labeled-input-patterns", "label1=.+,label2=\\d+", "--output-topic", "output1", - "--named-output-topics", "role1=output2,role2=output3", + "--labeled-output-topics", "label1=output2,label2=output3", "--kafka-config", "foo=1,bar=2", }); assertThat(app.getInputTopics()).containsExactly("input1", "input2"); - assertThat(app.getNamedInputTopics()) + assertThat(app.getLabeledInputTopics()) .hasSize(2) - .containsEntry("role1", List.of("input3")) - .containsEntry("role2", List.of("input4", "input5")); + .containsEntry("label1", List.of("input3")) + .containsEntry("label2", List.of("input4", "input5")); assertThat(app.getInputPattern()) .satisfies(pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".*").pattern())); - assertThat(app.getNamedInputPatterns()) + assertThat(app.getLabeledInputPatterns()) .hasSize(2) - .hasEntrySatisfying("role1", + .hasEntrySatisfying("label1", pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile(".+").pattern())) - .hasEntrySatisfying("role2", + .hasEntrySatisfying("label2", pattern -> assertThat(pattern.pattern()).isEqualTo(Pattern.compile("\\d+").pattern())); assertThat(app.getOutputTopic()).isEqualTo("output1"); - assertThat(app.getNamedOutputTopics()) + assertThat(app.getLabeledOutputTopics()) .hasSize(2) - .containsEntry("role1", "output2") - .containsEntry("role2", "output3"); + .containsEntry("label1", "output2") + .containsEntry("label2", "output3"); assertThat(app.getKafkaConfig()) .hasSize(2) .containsEntry("foo", "1") diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java index d4da8f27..219ae31e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerCleanUpRunner.java @@ -107,7 +107,7 @@ private void deleteTopic(final String topic) { private Iterable getAllOutputTopics() { return Seq.of(ProducerCleanUpRunner.this.topics.getOutputTopic()) - .concat(ProducerCleanUpRunner.this.topics.getNamedOutputTopics().values()); + .concat(ProducerCleanUpRunner.this.topics.getLabeledOutputTopics().values()); } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java index 21583c90..87b33023 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducerTopicConfig.java @@ -43,21 +43,21 @@ public class ProducerTopicConfig { String outputTopic; /** - * Named output topics that are identified by a role + * Output topics that are identified by a label */ @Builder.Default @NonNull - Map namedOutputTopics = emptyMap(); + Map labeledOutputTopics = emptyMap(); /** - * Get named output topic for a specified role + * Get output topic for a specified label * - * @param role role of named output topic + * @param label label of output topic * @return topic name */ - public String getOutputTopic(final String role) { - final String topic = this.namedOutputTopics.get(role); - Preconditions.checkNotNull(topic, "No output topic for role '%s' available", role); + public String getOutputTopic(final String label) { + final String topic = this.labeledOutputTopics.get(label); + Preconditions.checkNotNull(topic, "No output topic for label '%s' available", label); return topic; } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java index 47d9b9d0..080b4b9d 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsTopicConfig.java @@ -47,60 +47,60 @@ public class StreamsTopicConfig { @Builder.Default @NonNull List inputTopics = emptyList(); /** - * Named input topics that are identified by a role + * Input topics that are identified by a label */ @Builder.Default @NonNull - Map> namedInputTopics = emptyMap(); + Map> labeledInputTopics = emptyMap(); Pattern inputPattern; /** - * Named input patterns that are identified by a role + * Input patterns that are identified by a label */ @Builder.Default @NonNull - Map namedInputPatterns = emptyMap(); + Map labeledInputPatterns = emptyMap(); String outputTopic; /** - * Named output topics that are identified by a role + * Output topics that are identified by a label */ @Builder.Default @NonNull - Map namedOutputTopics = emptyMap(); + Map labeledOutputTopics = emptyMap(); String errorTopic; /** - * Get named input topics for a specified role + * Get input topics for a specified label * - * @param role role of named input topics + * @param label label of input topics * @return topic names */ - public List getInputTopics(final String role) { - final List topics = this.namedInputTopics.get(role); - Preconditions.checkNotNull(topics, "No input topics for role '%s' available", role); + public List getInputTopics(final String label) { + final List topics = this.labeledInputTopics.get(label); + Preconditions.checkNotNull(topics, "No input topics for label '%s' available", label); return topics; } /** - * Get named input pattern for a specified role + * Get input pattern for a specified label * - * @param role role of named input pattern + * @param label label of input pattern * @return topic pattern */ - public Pattern getInputPattern(final String role) { - final Pattern pattern = this.namedInputPatterns.get(role); - Preconditions.checkNotNull(pattern, "No input pattern for role '%s' available", role); + public Pattern getInputPattern(final String label) { + final Pattern pattern = this.labeledInputPatterns.get(label); + Preconditions.checkNotNull(pattern, "No input pattern for label '%s' available", label); return pattern; } /** - * Get named output topic for a specified role + * Get output topic for a specified label * - * @param role role of named output topic + * @param label label of output topic * @return topic name */ - public String getOutputTopic(final String role) { - final String topic = this.namedOutputTopics.get(role); - Preconditions.checkNotNull(topic, "No output topic for role '%s' available", role); + public String getOutputTopic(final String label) { + final String topic = this.labeledOutputTopics.get(label); + Preconditions.checkNotNull(topic, "No output topic for label '%s' available", label); return topic; } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java index 2e8da9bc..13717ead 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/TopologyBuilder.java @@ -71,25 +71,25 @@ public KStream streamInput() { /** * Create a {@code KStream} from all {@link StreamsTopicConfig#getInputTopics(String)} - * @param role role of named input topics + * @param label label of input topics * @param consumed define optional parameters for streaming topics * @return a {@code KStream} for all {@link StreamsTopicConfig#getInputTopics(String)} * @param type of keys * @param type of values */ - public KStream streamInput(final String role, final Consumed consumed) { - return this.streamsBuilder.stream(this.topics.getInputTopics(role), consumed); + public KStream streamInput(final String label, final Consumed consumed) { + return this.streamsBuilder.stream(this.topics.getInputTopics(label), consumed); } /** * Create a {@code KStream} from all {@link StreamsTopicConfig#getInputTopics(String)} - * @param role role of named input topics + * @param label label of input topics * @return a {@code KStream} for all {@link StreamsTopicConfig#getInputTopics(String)} * @param type of keys * @param type of values */ - public KStream streamInput(final String role) { - return this.streamsBuilder.stream(this.topics.getInputTopics(role)); + public KStream streamInput(final String label) { + return this.streamsBuilder.stream(this.topics.getInputTopics(label)); } /** @@ -115,25 +115,25 @@ public KStream streamInputPattern() { /** * Create a {@code KStream} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} - * @param role role of named input pattern + * @param label label of input pattern * @param consumed define optional parameters for streaming topics * @return a {@code KStream} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values */ - public KStream streamInputPattern(final String role, final Consumed consumed) { - return this.streamsBuilder.stream(this.topics.getInputPattern(role), consumed); + public KStream streamInputPattern(final String label, final Consumed consumed) { + return this.streamsBuilder.stream(this.topics.getInputPattern(label), consumed); } /** * Create a {@code KStream} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} - * @param role role of named input pattern + * @param label label of input pattern * @return a {@code KStream} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values */ - public KStream streamInputPattern(final String role) { - return this.streamsBuilder.stream(this.topics.getInputPattern(role)); + public KStream streamInputPattern(final String label) { + return this.streamsBuilder.stream(this.topics.getInputPattern(label)); } /** diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 23e778d7..2e6b3f26 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -37,8 +37,8 @@ import com.bakdata.kafka.StreamsRunner; import com.bakdata.kafka.StreamsTopicConfig; import com.bakdata.kafka.TopologyBuilder; +import com.bakdata.kafka.test_applications.LabeledInputTopics; import com.bakdata.kafka.test_applications.Mirror; -import com.bakdata.kafka.test_applications.NamedInputTopics; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.Map; @@ -107,9 +107,9 @@ private static ConfiguredStreamsApp createMirrorApplication() { .build()); } - private static ConfiguredStreamsApp createNamedInputTopicsApplication() { - return configureApp(new NamedInputTopics(), StreamsTopicConfig.builder() - .namedInputTopics(Map.of("role", List.of("input1", "input2"))) + private static ConfiguredStreamsApp createLabeledInputTopicsApplication() { + return configureApp(new LabeledInputTopics(), StreamsTopicConfig.builder() + .labeledInputTopics(Map.of("label", List.of("input1", "input2"))) .outputTopic("output") .build()); } @@ -147,11 +147,11 @@ void shouldRunApp() throws InterruptedException { } @Test - void shouldUseMultipleNamedInputTopics() throws InterruptedException { - try (final ConfiguredStreamsApp app = createNamedInputTopicsApplication(); + void shouldUseMultiplelabeledInputTopics() throws InterruptedException { + try (final ConfiguredStreamsApp app = createLabeledInputTopicsApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { - final List inputTopics = app.getTopics().getNamedInputTopics().get("role"); + final List inputTopics = app.getTopics().getLabeledInputTopics().get("label"); final String inputTopic1 = inputTopics.get(0); final String inputTopic2 = inputTopics.get(1); final String outputTopic = app.getTopics().getOutputTopic(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/NamedInputTopics.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/LabeledInputTopics.java similarity index 96% rename from streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/NamedInputTopics.java rename to streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/LabeledInputTopics.java index 6ff3ec72..51087643 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/NamedInputTopics.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/test_applications/LabeledInputTopics.java @@ -33,10 +33,10 @@ import org.apache.kafka.streams.kstream.KStream; @NoArgsConstructor -public class NamedInputTopics implements StreamsApp { +public class LabeledInputTopics implements StreamsApp { @Override public void buildTopology(final TopologyBuilder builder) { - final KStream input = builder.streamInput("role"); + final KStream input = builder.streamInput("label"); input.to(builder.getTopics().getOutputTopic()); }