Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 19, 2024
1 parent af81485 commit 6cdb0d9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@

package com.bakdata.kafka;

import static java.util.Collections.emptyMap;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -87,15 +88,15 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
@CommandLine.Option(names = "--output-topic", description = "Output topic")
private String outputTopic;
@CommandLine.Option(names = "--extra-output-topics", split = ",", description = "Additional named output topics")
private Map<String, String> extraOutputTopics = new HashMap<>();
private Map<String, String> extraOutputTopics = emptyMap();
@CommandLine.Option(names = "--brokers", required = true, description = "Broker addresses to connect to")
private String brokers;
@CommandLine.Option(names = "--debug", arity = "0..1", description = "Configure logging to debug")
private boolean debug;
@CommandLine.Option(names = "--schema-registry-url", description = "URL of Schema Registry")
private String schemaRegistryUrl;
@CommandLine.Option(names = "--kafka-config", split = ",", description = "Additional Kafka properties")
private Map<String, String> kafkaConfig = new HashMap<>();
private Map<String, String> kafkaConfig = emptyMap();

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

package com.bakdata.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -67,16 +68,16 @@ public abstract class KafkaStreamsApplication extends
KafkaApplication<StreamsRunner, StreamsCleanUpRunner, StreamsExecutionOptions,
ExecutableStreamsApp<StreamsApp>, ConfiguredStreamsApp<StreamsApp>, StreamsTopicConfig, StreamsApp> {
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
private List<String> inputTopics = new ArrayList<>();
private List<String> inputTopics = emptyList();
@CommandLine.Option(names = "--input-pattern", description = "Input pattern")
private Pattern inputPattern;
@CommandLine.Option(names = "--error-topic", description = "Error topic")
private String errorTopic;
@CommandLine.Option(names = "--extra-input-topics", split = ",", description = "Additional named input topics",
converter = {UseDefaultConverter.class, StringListConverter.class})
private Map<String, List<String>> extraInputTopics = new HashMap<>();
private Map<String, List<String>> extraInputTopics = emptyMap();
@CommandLine.Option(names = "--extra-input-patterns", split = ",", description = "Additional named input patterns")
private Map<String, Pattern> extraInputPatterns = new HashMap<>();
private Map<String, Pattern> extraInputPatterns = emptyMap();
@CommandLine.Option(names = "--volatile-group-instance-id", arity = "0..1",
description = "Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.")
private boolean volatileGroupInstanceId;
Expand Down

0 comments on commit 6cdb0d9

Please sign in to comment.