diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java index 05997e445..3270edc8c 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java @@ -10,6 +10,7 @@ import com.gotocompany.dagger.core.sink.influx.InfluxDBSink; import com.gotocompany.dagger.core.utils.KafkaConfigUtil; import com.gotocompany.dagger.core.utils.Constants; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -24,6 +25,9 @@ import com.gotocompany.dagger.core.sink.kafka.KafkaSerializerBuilder; import com.gotocompany.dagger.core.sink.log.LogSink; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -117,6 +121,7 @@ protected Properties getProducerProperties(Configuration configuration) { .map(ParameterTool::getProperties) .orElseGet(Properties::new)); kafkaProducerConfigs.putAll(dynamicProperties); + setSslPasswordsFromFile(configuration, kafkaProducerConfigs); return kafkaProducerConfigs; } @@ -136,4 +141,23 @@ public Map> getTelemetry() { private void addMetric(String key, String value) { metrics.computeIfAbsent(key, k -> new ArrayList<>()).add(value); } + + private void setSslPasswordsFromFile(Configuration configuration, Properties kafkaProps) { + String sslTruststorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY); + if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) { + kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation)); + } + String sslKeystorePasswordFileLocation = configuration.getString(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY, StringUtils.EMPTY); + if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) { + kafkaProps.setProperty(Constants.KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation)); + } + } + + private String parsePasswordFile(String path) { + try { + return new String(Files.readAllBytes(Paths.get(path))); + } catch (IOException e) { + throw new IllegalArgumentException("Error reading password file: " + path, e); + } + } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java index 6ade4a723..b7b834bd1 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/source/config/StreamConfig.java @@ -17,6 +17,7 @@ import com.gotocompany.dagger.core.source.config.models.SourceName; import com.gotocompany.dagger.core.source.config.models.TimeRangePool; import com.gotocompany.dagger.core.utils.KafkaConfigUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import com.google.gson.Gson; @@ -27,7 +28,10 @@ import lombok.Getter; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import java.io.IOException; import java.io.StringReader; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -59,6 +63,10 @@ public class StreamConfig { @Getter private String sslKeystorePassword; + @SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION) + @Getter + private String sslKeystorePasswordFileLocation; + @SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY) @Getter @JsonAdapter(value = DaggerSSLKeyStoreFileTypeAdaptor.class) @@ -77,6 +85,10 @@ public class StreamConfig { @Getter private String sslTruststorePassword; + @SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION) + @Getter + private String sslTruststorePasswordFileLocation; + @SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY) @Getter @JsonAdapter(value = DaggerSSLTrustStoreFileTypeAdaptor.class) @@ -217,6 +229,7 @@ public Properties getKafkaProps(Configuration configuration) { .filter(e -> e.getKey().toLowerCase().startsWith(KAFKA_PREFIX)) .forEach(e -> kafkaProps.setProperty(parseVarName(e.getKey(), KAFKA_PREFIX), e.getValue())); setAdditionalKafkaConsumerConfigs(kafkaProps, configuration); + setSslPasswords(kafkaProps); return kafkaProps; } @@ -247,4 +260,21 @@ public OffsetsInitializer getStartingOffset() { private OffsetResetStrategy getOffsetResetStrategy() { return OffsetResetStrategy.valueOf(autoOffsetReset.toUpperCase()); } + + private void setSslPasswords(Properties kafkaProps) { + if (StringUtils.isNotEmpty(sslTruststorePasswordFileLocation)) { + kafkaProps.setProperty(KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, parsePasswordFile(sslTruststorePasswordFileLocation)); + } + if (StringUtils.isNotEmpty(sslKeystorePasswordFileLocation)) { + kafkaProps.setProperty(KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, parsePasswordFile(sslKeystorePasswordFileLocation)); + } + } + + private String parsePasswordFile(String path) { + try { + return new String(Files.readAllBytes(Paths.get(path))); + } catch (IOException e) { + throw new IllegalArgumentException("Error reading password file: " + path, e); + } + } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 78fdbe707..a88f7e0f5 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -82,6 +82,11 @@ public class Constants { public static final String SINK_KAFKA_MAX_REQUEST_SIZE_KEY = "max.request.size"; public static final String SINK_KAFKA_MAX_REQUEST_SIZE_DEFAULT = "20971520"; public static final String SINK_KAFKA_LINGER_MS_DEFAULT = "0"; + public static final String SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY"; + public static final String SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY = "SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY"; + + public static final String KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY = "ssl.truststore.password"; + public static final String KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY = "ssl.keystore.password"; public static final String ES_TYPE = "ES"; public static final String HTTP_TYPE = "HTTP"; @@ -133,9 +138,11 @@ public class Constants { public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD"; + public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD"; + public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE"; public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 1a660c543..21af6f6a9 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -12,9 +12,14 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.mockito.Mock; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; import java.util.*; import static com.gotocompany.dagger.common.core.Constants.*; @@ -36,6 +41,8 @@ public class SinkOrchestratorTest { private MetricsTelemetryExporter telemetryExporter; @Mock private DaggerStatsDReporter daggerStatsDReporter; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Before public void setup() { @@ -73,7 +80,7 @@ public void shouldGiveInfluxWhenConfiguredToUseNothing() throws Exception { } @Test - public void shouldSetKafkaProducerConfigurations() throws Exception { + public void shouldSetKafkaProducerConfigurations() { Map additionalParameters = new HashMap<>(); additionalParameters.put(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); @@ -81,6 +88,7 @@ public void shouldSetKafkaProducerConfigurations() throws Exception { when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000"); when(configuration.getParam()).thenReturn(ParameterTool.fromMap(additionalParameters)); when(configuration.getString(eq(SINK_KAFKA_PRODUCER_CONFIG_SASL_LOGIN_CALLBACK_HANDLER_CLASS), anyString())).thenReturn(SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); + Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration); assertEquals(producerProperties.getProperty("compression.type"), "snappy"); @@ -89,6 +97,42 @@ public void shouldSetKafkaProducerConfigurations() throws Exception { assertEquals(producerProperties.getProperty("sasl.login.callback.handler.class"), SASL_LOGIN_CALLBACK_HANDLER_CLASS_VALUE); } + @Test + public void shouldParseSslPasswords() throws IOException { + File trustStorePasswordFile = writeDummyPasswordFile("truststore-password.txt", "truststore-password"); + File keyStorePasswordFile = writeDummyPasswordFile("keystore-password.txt", "keystore-password"); + when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn(trustStorePasswordFile.getAbsolutePath()); + when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn(keyStorePasswordFile.getAbsolutePath()); + when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); + when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true); + when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000"); + + Properties producerProperties = sinkOrchestrator.getProducerProperties(configuration); + + assertEquals(producerProperties.getProperty("ssl.truststore.password"), "truststore-password"); + assertEquals(producerProperties.getProperty("ssl.keystore.password"), "keystore-password"); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenKeyStorePasswordIsNotExists() { + when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_KEYSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn("/non-exists.txt"); + when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); + when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true); + when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000"); + + sinkOrchestrator.getProducerProperties(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenTrustStorePasswordIsNotExists() { + when(configuration.getString(eq(Constants.SINK_KAFKA_SSL_TRUSTSTORE_PASSWORD_LOCATION_KEY), anyString())).thenReturn("/non-exists.txt"); + when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); + when(configuration.getBoolean(eq(Constants.SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE_KEY), anyBoolean())).thenReturn(true); + when(configuration.getString(eq(Constants.SINK_KAFKA_LINGER_MS_KEY), anyString())).thenReturn("1000"); + + sinkOrchestrator.getProducerProperties(configuration); + } + @Test public void shouldThrowIllegalArgumentExceptionForInvalidLingerMs() throws Exception { when(configuration.getString(eq(Constants.SINK_KAFKA_BROKERS_KEY), anyString())).thenReturn("10.200.216.87:6668"); @@ -119,4 +163,12 @@ public void shouldReturnBigQuerySink() { Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter); assertThat(sinkFunction, instanceOf(BigQuerySink.class)); } + + private File writeDummyPasswordFile(String fileName, String password) throws IOException { + File passwordFile = temporaryFolder.newFile(fileName); + FileWriter writer = new FileWriter(passwordFile); + writer.write(password); + writer.close(); + return passwordFile; + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java index 23527ea52..7a1d0ff47 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/source/config/StreamConfigTest.java @@ -12,8 +12,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.mockito.Mock; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Properties; @@ -31,6 +35,9 @@ public class StreamConfigTest { @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Mock private Configuration configuration; @@ -121,18 +128,17 @@ public void shouldParseKafkaPropertiesWithSSLConfigurations() { kafkaPropMap.put("bootstrap.servers", "localhost:9092"); kafkaPropMap.put("auto.offset.reset", "latest"); kafkaPropMap.put("auto.commit.enable", ""); - kafkaPropMap.put("ssl.keystore.password", "test-keystore-pass"); + kafkaPropMap.put(Constants.KAFKA_PROPS_SSL_KEYSTORE_PASSWORD_KEY, "test-keystore-pass"); kafkaPropMap.put("ssl.keystore.type", "JKS"); kafkaPropMap.put("ssl.keystore.location", "test-keystore-location"); kafkaPropMap.put("ssl.protocol", "SSL"); kafkaPropMap.put("ssl.key.password", "test-key-pass"); kafkaPropMap.put("ssl.truststore.type", "JKS"); kafkaPropMap.put("ssl.truststore.location", "test-truststore-location"); - kafkaPropMap.put("ssl.truststore.password", "test-truststore-pass"); + kafkaPropMap.put(Constants.KAFKA_PROPS_SSL_TRUSTSTORE_PASSWORD_KEY, "test-truststore-pass"); kafkaPropMap.put("security.protocol", "SSL"); - Properties properties = new Properties(); properties.putAll(kafkaPropMap); @@ -247,6 +253,52 @@ public void shouldParseMultipleAdditionalConsumerConfigs() { assertEquals("1000", secondStreamProperties.getProperty("offset.flush.interval.ms")); } + @Test + public void shouldParseSslPasswordsConfig() throws IOException { + File keystorePassword = writeDummyPasswordFile("ssl-keystore-password.txt", "keystore-password"); + File truststorePassword = writeDummyPasswordFile("ssl-truststore-password.txt", "truststore-password"); + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", keystorePassword.getAbsolutePath(), truststorePassword.getAbsolutePath())); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + + Properties streamProperties = streamConfigs[0].getKafkaProps(configuration); + + assertEquals("keystore-password", streamProperties.getProperty("ssl.keystore.password")); + assertEquals("truststore-password", streamProperties.getProperty("ssl.truststore.password")); + } + + @Test + public void shouldOverrideExplicitSslPasswordConfig() throws IOException { + File keystorePassword = writeDummyPasswordFile("ssl-keystore-password.txt", "keystore-password"); + File truststorePassword = writeDummyPasswordFile("ssl-truststore-password.txt", "truststore-password"); + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD\": \"password\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", keystorePassword.getAbsolutePath(), truststorePassword.getAbsolutePath())); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + + Properties streamProperties = streamConfigs[0].getKafkaProps(configuration); + + assertEquals("keystore-password", streamProperties.getProperty("ssl.keystore.password")); + assertEquals("truststore-password", streamProperties.getProperty("ssl.truststore.password")); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenKeystorePasswordFileNotExists() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", "non-exist.txt")); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + + streamConfigs[0].getKafkaProps(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionWhenTruststorePasswordFileNotExists() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn(String.format("[{\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD_FILE_LOCATION\": \"%s\"}]", "non-exist.txt")); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + + streamConfigs[0].getKafkaProps(configuration); + } + @Test(expected = IllegalArgumentException.class) public void shouldThrowIllegalArgumentExceptionIfAdditionalKafkaPropsNotMatchingPrefix() { String streamConfig = "[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key\", \"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location\"} }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\", \"SOURCE_KAFKA_CONSUMER_ADDITIONAL_CONFIGURATIONS\": {\"SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_KEY\": \"ssl_keystore_key_2\", \"CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION\": \"ssl_keystore_location_2\", \"SOURCE_KAFKA_CONSUMER_CONFIG_OFFSET_FLUSH_INTERVAL_MS\":\"1000\"} } ]"; @@ -546,4 +598,13 @@ public void shouldTrimLeadingAndTrailingWhitespacesFromParquetFilePathsWhenParqu Assert.assertArrayEquals(new String[]{"gs://some-parquet-path", "gs://another-parquet-path"}, streamConfigs[0].getParquetFilePaths()); } + + private File writeDummyPasswordFile(String fileName, String password) throws IOException { + File passwordFile = temporaryFolder.newFile(fileName); + FileWriter writer = new FileWriter(passwordFile); + writer.write(password); + writer.close(); + return passwordFile; + } + }