From d7b270992db315f0535cfe712284637f51f3943d Mon Sep 17 00:00:00 2001 From: ekawinataa Date: Mon, 16 Sep 2024 19:03:47 +0700 Subject: [PATCH] feat: Add dynamic source and sink kafka properties (#31) * add kafka security module * aDD SASL class callback config for producer and consumer * Add config map * remove build.gradle * Add dynamic props * Update regex * rename var * Remove redundant imports * Rename prefix * Remove unused import * Update test * Add implementation for sink dynamic props * Add null checking for the additional props * Added validations on source config * Add docs and refactor pattern to enum * chECKSTYLE * Add readme * Make the pattern more specific and embedded to the enum * Add more test * bump version * Add unit tests * Use expected annotation * Assert exception message. Add fail mechanism in case of not throwing any exception * Use rule for asserting exception * Add more test case * add more unit test * feat: Enable multiple underscore parsing * test: Add test on multiple underscore parsing --- .../KafkaConnectorTypesMetadata.java | 18 +++ .../dagger/core/sink/SinkOrchestrator.java | 9 +- .../core/source/config/StreamConfig.java | 19 ++- ...nsumerAdditionalConfigurationsAdaptor.java | 40 ++++++ .../dagger/core/utils/Constants.java | 2 +- .../dagger/core/utils/KafkaConfigUtil.java | 28 ++++ .../core/sink/SinkOrchestratorTest.java | 8 ++ .../core/source/config/StreamConfigTest.java | 36 +++++ ...erAdditionalConfigurationsAdaptorTest.java | 132 ++++++++++++++++++ .../core/utils/KafkaConfigUtilTest.java | 105 ++++++++++++++ docs/docs/guides/kafka.md | 41 ++++++ version.txt | 2 +- 12 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/enumeration/KafkaConnectorTypesMetadata.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptor.java create mode 100644 dagger-core/src/main/java/com/gotocompany/dagger/core/utils/KafkaConfigUtil.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest.java create mode 100644 dagger-core/src/test/java/com/gotocompany/dagger/core/utils/KafkaConfigUtilTest.java create mode 100644 docs/docs/guides/kafka.md diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/enumeration/KafkaConnectorTypesMetadata.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/enumeration/KafkaConnectorTypesMetadata.java new file mode 100644 index 000000000..9a7739df0 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/enumeration/KafkaConnectorTypesMetadata.java @@ -0,0 +1,18 @@ +package com.gotocompany.dagger.core.enumeration; + +import java.util.regex.Pattern; + +public enum KafkaConnectorTypesMetadata { + SOURCE("SOURCE_KAFKA_CONSUMER_CONFIG_+"), SINK("SINK_KAFKA_PRODUCER_CONFIG_+"); + + KafkaConnectorTypesMetadata(String prefixPattern) { + this.prefixPattern = prefixPattern; + } + + private final String prefixPattern; + + public Pattern getConfigurationPattern() { + return Pattern.compile(String.format("^%s(.*)", prefixPattern), Pattern.CASE_INSENSITIVE); + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java index 7729ced7a..05997e445 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java @@ -1,5 +1,6 @@ package com.gotocompany.dagger.core.sink; +import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata; import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter; import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher; import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes; @@ -7,8 +8,10 @@ import com.gotocompany.dagger.core.sink.influx.ErrorHandler; import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper; import com.gotocompany.dagger.core.sink.influx.InfluxDBSink; +import com.gotocompany.dagger.core.utils.KafkaConfigUtil; import com.gotocompany.dagger.core.utils.Constants; import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; @@ -25,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; /** @@ -109,7 +113,10 @@ protected Properties getProducerProperties(Configuration configuration) { String lingerMs = configuration.getString(Constants.SINK_KAFKA_LINGER_MS_KEY, Constants.SINK_KAFKA_LINGER_MS_DEFAULT); validateLingerMs(lingerMs); kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_LINGER_MS_CONFIG_KEY, lingerMs); - + Properties dynamicProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SINK, Optional.ofNullable(configuration.getParam()) + .map(ParameterTool::getProperties) + .orElseGet(Properties::new)); + kafkaProducerConfigs.putAll(dynamicProperties); return kafkaProducerConfigs; } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java index d6f1c1af7..6ade4a723 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java @@ -1,6 +1,8 @@ package com.gotocompany.dagger.core.source.config; import com.google.gson.annotations.JsonAdapter; +import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata; +import com.gotocompany.dagger.core.source.config.adapter.DaggerKafkaConsumerAdditionalConfigurationsAdaptor; import com.gotocompany.dagger.core.source.config.adapter.DaggerSASLMechanismAdaptor; import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLKeyStoreFileTypeAdaptor; import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLProtocolAdaptor; @@ -14,6 +16,7 @@ import com.gotocompany.dagger.core.source.config.models.SourceDetails; import com.gotocompany.dagger.core.source.config.models.SourceName; import com.gotocompany.dagger.core.source.config.models.TimeRangePool; +import com.gotocompany.dagger.core.utils.KafkaConfigUtil; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import com.google.gson.Gson; @@ -26,6 +29,7 @@ import java.io.StringReader; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -34,7 +38,6 @@ import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_PROTO_CLASS; import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_TABLE; import static com.gotocompany.dagger.core.utils.Constants.*; -import static com.gotocompany.dagger.core.utils.Constants.STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY; public class StreamConfig { private static final Gson GSON = new GsonBuilder() @@ -154,6 +157,11 @@ public class StreamConfig { @Getter private SourceParquetSchemaMatchStrategy parquetSchemaMatchStrategy; + @SerializedName(SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS) + @JsonAdapter(value = DaggerKafkaConsumerAdditionalConfigurationsAdaptor.class) + @Getter + private Map additionalConsumerConfigurations; + @SerializedName(STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY) @JsonAdapter(FileDateRangeAdaptor.class) @Getter @@ -208,7 +216,7 @@ public Properties getKafkaProps(Configuration configuration) { .stream() .filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX)) .forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue())); - setAdditionalConfigs(kafkaProps, configuration); + setAdditionalKafkaConsumerConfigs(kafkaProps, configuration); return kafkaProps; } @@ -217,10 +225,15 @@ private String parseVarName(String varName, String kafkaPrefix) { return String.join(".", names); } - private void setAdditionalConfigs(Properties kafkaProps, Configuration configuration) { + private void setAdditionalKafkaConsumerConfigs(Properties kafkaProps, Configuration configuration) { if (configuration.getBoolean(SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY, SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT)) { kafkaProps.setProperty(SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_KEY, SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_DEFAULT); } + if (Objects.nonNull(this.additionalConsumerConfigurations)) { + Properties additionalKafkaProperties = new Properties(); + additionalKafkaProperties.putAll(this.additionalConsumerConfigurations); + kafkaProps.putAll(KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, additionalKafkaProperties)); + } } public Pattern getTopicPattern() { diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptor.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptor.java new file mode 100644 index 000000000..15b7f83e9 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptor.java @@ -0,0 +1,40 @@ +package com.gotocompany.dagger.core.source.config.adapter; + +import com.google.gson.Gson; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DaggerKafkaConsumerAdditionalConfigurationsAdaptor extends TypeAdapter> { + + @Override + public void write(JsonWriter jsonWriter, Map stringStringMap) throws IOException { + Gson gson = new Gson(); + jsonWriter.jsonValue(gson.toJson(stringStringMap)); + } + + @Override + public Map read(JsonReader jsonReader) throws IOException { + Gson gson = new Gson(); + Map map = gson.fromJson(jsonReader, Map.class); + List invalidProps = map.keySet().stream() + .filter(key -> !KafkaConnectorTypesMetadata.SOURCE.getConfigurationPattern() + .matcher(key) + .matches()) + .collect(Collectors.toList()); + if (!invalidProps.isEmpty()) { + throw new IllegalArgumentException("Invalid additional kafka consumer configuration properties found: " + invalidProps); + } + return map.entrySet() + .stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 2c2dc6f33..78fdbe707 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -129,7 +129,7 @@ public class Constants { public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG"; - + public static final String SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS = "SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD"; diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/KafkaConfigUtil.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/KafkaConfigUtil.java new file mode 100644 index 000000000..14539a01c --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/KafkaConfigUtil.java @@ -0,0 +1,28 @@ +package com.gotocompany.dagger.core.utils; + +import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata; + +import java.util.Properties; +import java.util.Set; +import java.util.regex.Matcher; + +public class KafkaConfigUtil { + + public static Properties parseKafkaConfiguration(KafkaConnectorTypesMetadata kafkaConnectorTypesMetadata, Properties properties) { + Properties kafkaProperties = new Properties(); + Set configKeys = properties.keySet(); + + for (Object key : configKeys) { + Matcher matcher = kafkaConnectorTypesMetadata.getConfigurationPattern() + .matcher(key.toString()); + if (matcher.find()) { + String kafkaConfigKey = matcher.group(1) + .toLowerCase() + .replaceAll("_+", "."); + kafkaProperties.setProperty(kafkaConfigKey, properties.get(key).toString()); + } + } + return kafkaProperties; + } + +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 9d97466c6..1a660c543 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -26,6 +26,9 @@ public class SinkOrchestratorTest { + private static final String SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS"; + private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE = "com.gotocompany.dagger.core.utils.SinkKafkaConfigUtil"; + private Configuration configuration; private StencilClientOrchestrator stencilClientOrchestrator; private SinkOrchestrator sinkOrchestrator; @@ -71,14 +74,19 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception { @Test public void shouldSetKafkaProducerConfigurations() throws Exception { + Map additionalParameters = new HashMap<>(); + additionalParameters.put(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true); when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000"); + when(configuration.getParam()).thenReturn(ParameterTool.fromMap(additionalParameters)); + when(configuration.getString(eq(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS), anyString())).thenReturn(SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration); assertEquals(producerProperties.getProperty("compression.type"), "snappy"); assertEquals(producerProperties.getProperty("max.request.size"), "20971520"); assertEquals(producerProperties.getProperty("linger.ms"), "1000"); + assertEquals(producerProperties.getProperty("sasl.login.callback.handler.class"), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); } @Test diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java index a16961d14..23527ea52 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java @@ -229,6 +229,42 @@ public void shouldParseMultipleStreamsFromStreamConfigWithSASLConfig() { assertEquals("local-kafka-stream", currConfigNext.getKafkaName()); } + @Test + public void shouldParseMultipleAdditionalConsumerConfigs() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + StreamConfig firstStreamConfig = streamConfigs[0]; + StreamConfig secondStreamConfig = streamConfigs[1]; + + Properties firstStreamProperties = firstStreamConfig.getKafkaProps(configuration); + Properties secondStreamProperties = secondStreamConfig.getKafkaProps(configuration); + + assertEquals("ssl_keystore_key", firstStreamProperties.getProperty("ssl.keystore.key")); + assertEquals("ssl_keystore_location", firstStreamProperties.getProperty("ssl.keystore.location")); + assertEquals("ssl_keystore_key_2", secondStreamProperties.getProperty("ssl.keystore.key")); + assertEquals("ssl_keystore_location_2", secondStreamProperties.getProperty("ssl.keystore.location")); + assertEquals("1000", secondStreamProperties.getProperty("offset.flush.interval.ms")); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefix() { + String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"; + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(streamConfig); + + StreamConfig.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefixMissingUnderscore() { + String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"SOURCE_KAFKACONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"; + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(streamConfig); + + StreamConfig.parse(configuration); + } + @Test public void shouldAddAdditionalKafkaConfigToKafkaProperties() { when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]"); diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest.java new file mode 100644 index 000000000..3b5fd859a --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/adapter/DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest.java @@ -0,0 +1,132 @@ +package com.gotocompany.dagger.core.source.config.adapter; + +import com.google.gson.JsonSyntaxException; +import org.junit.Rule; +import org.junit.Test; +import com.google.gson.stream.JsonReader; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest { + + private final DaggerKafkaConsumerAdditionalConfigurationsAdaptor daggerKafkaConsumerAdditionalConfigurationsAdaptor = new DaggerKafkaConsumerAdditionalConfigurationsAdaptor(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void shouldParseEmptyProperty() throws IOException { + String input = "{}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals(0, result.size()); + } + + @Test + public void shouldParseJsonStringToMap() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + Map expectedResult = new HashMap<>(); + expectedResult.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1", "value1"); + expectedResult.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2", "value2"); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals(expectedResult, result); + } + + @Test + public void shouldParseJsonStringWithCaseInsensitiveKeyToMap() throws IOException { + String input = "{\"sOurCe_KAFKA_CONSUMER_CONFIG_key_1\":\"value1\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + Map expectedResult = new HashMap<>(); + expectedResult.put("sOurCe_KAFKA_CONSUMER_CONFIG_key_1", "value1"); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals(expectedResult, result); + } + + @Test + public void shouldIgnoreNullValues() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":null,\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertFalse(result.containsKey("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1")); + assertEquals("value2", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2")); + } + + @Test + public void shouldHandleSpecialCharactersInValues() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"value with spaces and $pecial ch@racters\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals("value with spaces and $pecial ch@racters", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY")); + } + + @Test + public void shouldHandleNumericalValue() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\": \"120\", \"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\": \"120.5\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals("120", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1")); + assertEquals("120.5", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2")); + } + + @Test + public void shouldHandleBooleanValue() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\": \"false\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + Map result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + + assertEquals("true", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1")); + assertEquals("false", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2")); + } + + @Test + public void shouldWriteMapToStringJson() { + Map map = new HashMap<>(); + map.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1", "value1"); + map.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2", "120"); + map.put("source_kafka_consumer_config_key_3", "120.5"); + + String result = daggerKafkaConsumerAdditionalConfigurationsAdaptor.toJson(map); + + assertEquals("{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"120\",\"source_kafka_consumer_config_key_3\":\"120.5\"}", result); + } + + @Test + public void shouldThrowExceptionForInvalidProperties() throws IOException { + String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\",\"INVALID_KEY\":\"value3\"}"; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid additional kafka consumer configuration properties found: [INVALID_KEY]"); + + daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + } + + @Test(expected = JsonSyntaxException.class) + public void shouldThrowExceptionForMalformedJson() throws IOException { + String input = "\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\""; + JsonReader jsonReader = new JsonReader(new StringReader(input)); + + daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); + } + +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/KafkaConfigUtilTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/KafkaConfigUtilTest.java new file mode 100644 index 000000000..50f7d198d --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/utils/KafkaConfigUtilTest.java @@ -0,0 +1,105 @@ +package com.gotocompany.dagger.core.utils; + +import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata; +import org.junit.Test; + +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class KafkaConfigUtilTest { + + @Test + public void shouldParseMatchingSourceKafkaConsumerConfiguration() { + Properties properties = new Properties(); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1", "value1"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2", "value2"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIGKEY_3", "value3"); + properties.put("INVALID_KEY_4", "value4"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(2, kafkaProperties.size()); + assertEquals("value1", kafkaProperties.getProperty("key.1")); + assertEquals("value2", kafkaProperties.getProperty("key.2")); + } + + @Test + public void shouldParseMatchingSinkKafkaProducerConfiguration() { + Properties properties = new Properties(); + properties.put("SINK_KAFKA_PRODUCER_CONFIG_KEY_1", "value1"); + properties.put("SINK_KAFKA_PRODUCER_CONFIG_KEY_2", "value2"); + properties.put("SINK_KAFKA_PRODUCER_CONFIGKEY_3", "value3"); + properties.put("INVALID_KEY_4", "value4"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SINK, properties); + + assertEquals(2, kafkaProperties.size()); + assertEquals("value1", kafkaProperties.getProperty("key.1")); + assertEquals("value2", kafkaProperties.getProperty("key.2")); + } + + @Test + public void shouldReturnEmptyPropertiesWhenInputIsEmpty() { + Properties properties = new Properties(); + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(0, kafkaProperties.size()); + } + + @Test + public void shouldReturnEmptyPropertiesWhenAllKeysAreInvalid() { + Properties properties = new Properties(); + properties.put("INVALID_KEY_1", "value1"); + properties.put("INVALID_KEY_2", "value2"); + properties.put("ANOTHER_INVALID_KEY", "value3"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(0, kafkaProperties.size()); + } + + @Test + public void shouldParseOnlyValidKeysWhenMixedWithInvalidOnes() { + Properties properties = new Properties(); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1", "value1"); + properties.put("INVALID_KEY", "value2"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2", "value3"); + properties.put("ANOTHER_INVALID_KEY", "value4"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(2, kafkaProperties.size()); + assertEquals("value1", kafkaProperties.getProperty("key.1")); + assertEquals("value3", kafkaProperties.getProperty("key.2")); + } + + @Test + public void shouldParseCaseInsensitiveKeys() { + Properties properties = new Properties(); + properties.put("source_kafka_consumer_config_KEY_1", "value1"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_key_2", "value2"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(2, kafkaProperties.size()); + assertEquals("value1", kafkaProperties.getProperty("key.1")); + assertEquals("value2", kafkaProperties.getProperty("key.2")); + } + + @Test + public void shouldParseKeysWithMultipleUnderscores() { + Properties properties = new Properties(); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_MULTI_WORD_KEY", "value1"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_ANOTHER_MULTI_WORD_KEY", "value2"); + properties.put("SOURCE_KAFKA_CONSUMER_CONFIG__YET___ANOTHER_MULTI__WORD_KEY", "value3"); + + Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties); + + assertEquals(3, kafkaProperties.size()); + assertEquals("value1", kafkaProperties.getProperty("multi.word.key")); + assertEquals("value2", kafkaProperties.getProperty("another.multi.word.key")); + assertEquals("value3", kafkaProperties.getProperty("yet.another.multi.word.key")); + } + +} diff --git a/docs/docs/guides/kafka.md b/docs/docs/guides/kafka.md new file mode 100644 index 000000000..39efaee6b --- /dev/null +++ b/docs/docs/guides/kafka.md @@ -0,0 +1,41 @@ +# Kafka + +Kafka topics are used as the source and output of daggers. Both of source and output kafka configurations are defined through the properties file. + +## Source Kafka Configuration + +There can be multiple source kafka configurations in the properties file. Source configurations are defined through `STREAMS` property. +Here are the predefined properties for source kafka configuration: + +- SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE +- SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID +- SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS +- SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL +- SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM +- SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG +- SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE +- SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL + +Additional kafka configuration can be passed through the `SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS` property. This property should be a json key-value map. +For example : +- SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS={"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_DESERIALIZER":"org.apache.kafka.common.serialization.StringDeserializer","SOURCE_KAFKA_CONSUMER_CONFIG_VALUE_DESERIALIZER":"org.apache.kafka.common.serialization.StringDeserializer"} + + +## Sink Kafka Configuration + +There is only one sink kafka configuration in the properties file. Sink configuration is defined by properties having `SINK_KAFKA_PRODUCER_CONFIG` prefix. +Here are the predefined properties for sink kafka configuration: +- SINK_KAFKA_LINGER_MS_KEY +- SINK_KAFKA_BROKERS_KEY +- SINK_KAFKA_TOPIC_KEY + +Additional kafka configurations can be passed through by introducing new properties with the `SINK_KAFKA_PRODUCER_CONFIG` prefix. +For example : `SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS="io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler"` +Example above will add `sasl.login.callback.handler.class` to the sink kafka configuration. \ No newline at end of file diff --git a/version.txt b/version.txt index 9028ec636..69da6ebcd 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.10.5 +0.10.6