Skip to content

feat: Add mechanism to load SSL password from File #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import com.gotocompany.dagger.core.utils.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
Expand All @@ -24,6 +25,9 @@
import com.gotocompany.dagger.core.sink.kafka.KafkaSerializerBuilder;
import com.gotocompany.dagger.core.sink.log.LogSink;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -117,6 +121,7 @@ protected Properties getProducerProperties(Configuration configuration) {
.map(ParameterTool::getProperties)
.orElseGet(Properties::new));
kafkaProducerConfigs.putAll(dynamicProperties);
setSslPasswordsFromFile(configuration, kafkaProducerConfigs);
return kafkaProducerConfigs;
}

Expand All @@ -136,4 +141,23 @@ public Map<String, List<String>> getTelemetry() {
private void addMetric(String key, String value) {
metrics.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
}

private void setSslPasswordsFromFile(Configuration configuration, Properties kafkaProps) {
String sslTruststorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY);
if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) {
kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation));
}
String sslKeystorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY);
if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) {
kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation));
}
}

private String parsePasswordFile(String path) {
try {
return new String(Files.readAllBytes(Paths.get(path)));
} catch (IOException e) {
throw new IllegalArgumentException("Error reading password file: " + path, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.gotocompany.dagger.core.source.config.models.SourceName;
import com.gotocompany.dagger.core.source.config.models.TimeRangePool;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import com.google.gson.Gson;
Expand All @@ -27,7 +28,10 @@
import lombok.Getter;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -59,6 +63,10 @@ public class StreamConfig {
@Getter
private String sslKeystorePassword;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION)
@Getter
private String sslKeystorePasswordFileLocation;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY)
@Getter
@JsonAdapter(value = DaggerSSLKeyStoreFileTypeAdaptor.class)
Expand All @@ -77,6 +85,10 @@ public class StreamConfig {
@Getter
private String sslTruststorePassword;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION)
@Getter
private String sslTruststorePasswordFileLocation;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY)
@Getter
@JsonAdapter(value = DaggerSSLTrustStoreFileTypeAdaptor.class)
Expand Down Expand Up @@ -217,6 +229,7 @@ public Properties getKafkaProps(Configuration configuration) {
.filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX))
.forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue()));
setAdditionalKafkaConsumerConfigs(kafkaProps, configuration);
setSslPasswords(kafkaProps);
return kafkaProps;
}

Expand Down Expand Up @@ -247,4 +260,21 @@ public OffsetsInitializer getStartingOffset() {
private OffsetResetStrategy getOffsetResetStrategy() {
return OffsetResetStrategy.valueOf(autoOffsetReset.toUpperCase());
}

private void setSslPasswords(Properties kafkaProps) {
if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) {
kafkaProps.setProperty(KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation));
}
if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) {
kafkaProps.setProperty(KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation));
}
}

private String parsePasswordFile(String path) {
try {
return new String(Files.readAllBytes(Paths.get(path)));
} catch (IOException e) {
throw new IllegalArgumentException("Error reading password file: " + path, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class Constants {
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_KEY = "max.request.size";
public static final String SINK_KAFKA_MAX_REQUEST_SIZE_DEFAULT = "20971520";
public static final String SINK_KAFKA_LINGER_MS_DEFAULT = "0";
public static final String SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY";
public static final String SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY";

public static final String KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY = "ssl.truststore.password";
public static final String KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY = "ssl.keystore.password";

public static final String ES_TYPE = "ES";
public static final String HTTP_TYPE = "HTTP";
Expand Down Expand Up @@ -133,9 +138,11 @@ public class Constants {
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.*;

import static com.gotocompany.dagger.common.core.Constants.*;
Expand All @@ -36,6 +41,8 @@ public class SinkOrchestratorTest {
private MetricsTelemetryExporter telemetryExporter;
@Mock
private DaggerStatsDReporter daggerStatsDReporter;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Before
public void setup() {
Expand Down Expand Up @@ -73,14 +80,15 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception {
}

@Test
public void shouldSetKafkaProducerConfigurations() throws Exception {
public void shouldSetKafkaProducerConfigurations() {
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");
when(configuration.getParam()).thenReturn(ParameterTool.fromMap(additionalParameters));
when(configuration.getString(eq(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS), anyString())).thenReturn(SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);

Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration);

assertEquals(producerProperties.getProperty("compression.type"), "snappy");
Expand All @@ -89,6 +97,42 @@ public void shouldSetKafkaProducerConfigurations() throws Exception {
assertEquals(producerProperties.getProperty("sasl.login.callback.handler.class"), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE);
}

@Test
public void shouldParseSslPasswords() throws IOException {
File trustStorePasswordFile = writeDummyPasswordFile("truststore-password.txt", "truststore-password");
File keyStorePasswordFile = writeDummyPasswordFile("keystore-password.txt", "keystore-password");
when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn(trustStorePasswordFile.getAbsolutePath());
when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn(keyStorePasswordFile.getAbsolutePath());
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");

Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration);

assertEquals(producerProperties.getProperty("ssl.truststore.password"), "truststore-password");
assertEquals(producerProperties.getProperty("ssl.keystore.password"), "keystore-password");
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowWhenKeyStorePasswordIsNotExists() {
when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn("/non-exists.txt");
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");

sinkOrchestrator.getProducerProperties(configuration);
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowWhenTrustStorePasswordIsNotExists() {
when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn("/non-exists.txt");
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true);
when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000");

sinkOrchestrator.getProducerProperties(configuration);
}

@Test
public void shouldThrowIllegalArgumentExceptionForInvalidLingerMs() throws Exception {
when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668");
Expand Down Expand Up @@ -119,4 +163,12 @@ public void shouldReturnBigQuerySink() {
Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter);
assertThat(sinkFunction, instanceOf(BigQuerySink.class));
}

private File writeDummyPasswordFile(String fileName, String password) throws IOException {
File passwordFile = temporaryFolder.newFile(fileName);
FileWriter writer = new FileWriter(passwordFile);
writer.write(password);
writer.close();
return passwordFile;
}
}
Loading
Loading