Skip to content

Commit

Permalink
feat: Add dynamic source and sink kafka properties (#31)
Browse files Browse the repository at this point in the history
* add kafka security module

* aDD SASL class callback config for producer and consumer

* Add config map

* remove build.gradle

* Add dynamic props

* Update regex

* rename var

* Remove redundant imports

* Rename prefix

* Remove unused import

* Update test

* Add implementation for sink dynamic props

* Add null checking for the additional props

* Added validations on source config

* Add docs and refactor pattern to enum

* chECKSTYLE

* Add readme

* Make the pattern more specific and embedded to the enum

* Add more test

* bump version

* Add unit tests

* Use expected annotation

* Assert exception message. Add fail mechanism in case of not throwing any exception

* Use rule for asserting exception

* Add more test case

* add more unit test

* feat: Enable multiple underscore parsing

* test: Add test on multiple underscore parsing
  • Loading branch information
ekawinataa authored Sep 16, 2024
1 parent 6d225b4 commit d7b2709
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.gotocompany.dagger.core.enumeration;

import java.util.regex.Pattern;

public enum KafkaConnectorTypesMetadata {
SOURCE("SOURCE_KAFKA_CONSUMER_CONFIG_+"), SINK("SINK_KAFKA_PRODUCER_CONFIG_+");

KafkaConnectorTypesMetadata(String prefixPattern) {
this.prefixPattern = prefixPattern;
}

private final String prefixPattern;

public Pattern getConfigurationPattern() {
return Pattern.compile(String.format("^%s(.*)", prefixPattern), Pattern.CASE_INSENSITIVE);
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.gotocompany.dagger.core.sink;

import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;
import com.gotocompany.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher;
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes;
import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import com.gotocompany.dagger.core.sink.influx.ErrorHandler;
import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper;
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
Expand All @@ -25,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -109,7 +113,10 @@ protected Properties getProducerProperties(Configuration configuration) {
String lingerMs = configuration.getString(Constants.SINK_KAFKA_LINGER_MS_KEY, Constants.SINK_KAFKA_LINGER_MS_DEFAULT);
validateLingerMs(lingerMs);
kafkaProducerConfigs.setProperty(Constants.SINK_KAFKA_LINGER_MS_CONFIG_KEY, lingerMs);

Properties dynamicProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SINK, Optional.ofNullable(configuration.getParam())
.map(ParameterTool::getProperties)
.orElseGet(Properties::new));
kafkaProducerConfigs.putAll(dynamicProperties);
return kafkaProducerConfigs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.gotocompany.dagger.core.source.config;

import com.google.gson.annotations.JsonAdapter;
import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;
import com.gotocompany.dagger.core.source.config.adapter.DaggerKafkaConsumerAdditionalConfigurationsAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSASLMechanismAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLKeyStoreFileTypeAdaptor;
import com.gotocompany.dagger.core.source.config.adapter.DaggerSSLProtocolAdaptor;
Expand All @@ -14,6 +16,7 @@
import com.gotocompany.dagger.core.source.config.models.SourceDetails;
import com.gotocompany.dagger.core.source.config.models.SourceName;
import com.gotocompany.dagger.core.source.config.models.TimeRangePool;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import com.google.gson.Gson;
Expand All @@ -26,6 +29,7 @@

import java.io.StringReader;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand All @@ -34,7 +38,6 @@
import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_PROTO_CLASS;
import static com.gotocompany.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_TABLE;
import static com.gotocompany.dagger.core.utils.Constants.*;
import static com.gotocompany.dagger.core.utils.Constants.STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY;

public class StreamConfig {
private static final Gson GSON = new GsonBuilder()
Expand Down Expand Up @@ -154,6 +157,11 @@ public class StreamConfig {
@Getter
private SourceParquetSchemaMatchStrategy parquetSchemaMatchStrategy;

@SerializedName(SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS)
@JsonAdapter(value = DaggerKafkaConsumerAdditionalConfigurationsAdaptor.class)
@Getter
private Map<String, String> additionalConsumerConfigurations;

@SerializedName(STREAM_SOURCE_PARQUET_FILE_DATE_RANGE_KEY)
@JsonAdapter(FileDateRangeAdaptor.class)
@Getter
Expand Down Expand Up @@ -208,7 +216,7 @@ public Properties getKafkaProps(Configuration configuration) {
.stream()
.filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX))
.forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue()));
setAdditionalConfigs(kafkaProps, configuration);
setAdditionalKafkaConsumerConfigs(kafkaProps, configuration);
return kafkaProps;
}

Expand All @@ -217,10 +225,15 @@ private String parseVarName(String varName, String kafkaPrefix) {
return String.join(".", names);
}

private void setAdditionalConfigs(Properties kafkaProps, Configuration configuration) {
private void setAdditionalKafkaConsumerConfigs(Properties kafkaProps, Configuration configuration) {
if (configuration.getBoolean(SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY, SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT)) {
kafkaProps.setProperty(SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_KEY, SOURCE_KAFKA_MAX_PARTITION_FETCH_BYTES_DEFAULT);
}
if (Objects.nonNull(this.additionalConsumerConfigurations)) {
Properties additionalKafkaProperties = new Properties();
additionalKafkaProperties.putAll(this.additionalConsumerConfigurations);
kafkaProps.putAll(KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, additionalKafkaProperties));
}
}

public Pattern getTopicPattern() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.gotocompany.dagger.core.source.config.adapter;

import com.google.gson.Gson;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DaggerKafkaConsumerAdditionalConfigurationsAdaptor extends TypeAdapter<Map<String, String>> {

@Override
public void write(JsonWriter jsonWriter, Map<String, String> stringStringMap) throws IOException {
Gson gson = new Gson();
jsonWriter.jsonValue(gson.toJson(stringStringMap));
}

@Override
public Map<String, String> read(JsonReader jsonReader) throws IOException {
Gson gson = new Gson();
Map<String, String> map = gson.fromJson(jsonReader, Map.class);
List<String> invalidProps = map.keySet().stream()
.filter(key -> !KafkaConnectorTypesMetadata.SOURCE.getConfigurationPattern()
.matcher(key)
.matches())
.collect(Collectors.toList());
if (!invalidProps.isEmpty()) {
throw new IllegalArgumentException("Invalid additional kafka consumer configuration properties found: " + invalidProps);
}
return map.entrySet()
.stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public class Constants {
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG";

public static final String SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS = "SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.gotocompany.dagger.core.utils;

import com.gotocompany.dagger.core.enumeration.KafkaConnectorTypesMetadata;

import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;

public class KafkaConfigUtil {

public static Properties parseKafkaConfiguration(KafkaConnectorTypesMetadata kafkaConnectorTypesMetadata, Properties properties) {
Properties kafkaProperties = new Properties();
Set<Object> configKeys = properties.keySet();

for (Object key : configKeys) {
Matcher matcher = kafkaConnectorTypesMetadata.getConfigurationPattern()
.matcher(key.toString());
if (matcher.find()) {
String kafkaConfigKey = matcher.group(1)
.toLowerCase()
.replaceAll("_+", ".");
kafkaProperties.setProperty(kafkaConfigKey, properties.get(key).toString());
}
}
return kafkaProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public class SinkOrchestratorTest {

private static final String SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS";
private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE = "com.gotocompany.dagger.core.utils.SinkKafkaConfigUtil";

private Configuration configuration;
private StencilClientOrchestrator stencilClientOrchestrator;
private SinkOrchestrator sinkOrchestrator;
Expand Down Expand Up @@ -71,14 +74,19 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {

@Test
public void shouldSetKafkaProducerConfigurations() throws Exception {
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");
when(configuration.getParam()).thenReturn(ParameterTool.fromMap(additionalParameters));
when(configuration.getString(eq(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS), anyString())).thenReturn(SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration);

assertEquals(producerProperties.getProperty("compression.type"), "snappy");
assertEquals(producerProperties.getProperty("max.request.size"), "20971520");
assertEquals(producerProperties.getProperty("linger.ms"), "1000");
assertEquals(producerProperties.getProperty("sasl.login.callback.handler.class"), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
}

@Test
Expand Down
Loading

0 comments on commit d7b2709

Please sign in to comment.