Skip to content

Commit

Permalink
CassandraDriverConfigLoader from GCS (#2077)
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle authored Dec 24, 2024
1 parent 0962cb0 commit 177a218
Show file tree
Hide file tree
Showing 6 changed files with 1,845 additions and 0 deletions.
12 changes: 12 additions & 0 deletions v2/spanner-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigException;
import java.io.FileNotFoundException;
import java.net.URL;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A common static utility class that allows the spanner migration pipelines to ingest Cassandra
* Driver config file from GCS. Cassandra has a structured config file to accept all the driver
* parameters, be it list of host ip addresses, credentials, retry policy and many more. Most of
* these parameters are very specific to the Cassandra Database. Refer to the <a
* href=>https://docs.datastax.com/en/developer/java-driver/4.3/manual/core/configuration/reference/index.html>reference
* configuration</a> for the file format.
*/
public final class CassandraDriverConfigLoader {

private static final Logger LOG = LoggerFactory.getLogger(CassandraDriverConfigLoader.class);
private static final ImmutableMap<String, TypedDriverOption> OPTIONS_SUPPORTED_BY_DRIVER =
getOptionsSupportedByDriver();

/**
* Load the Cassandra Config from a file as a {@link DriverConfigLoader}.
*
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
* @return DriverConfigLoader.
* @throws FileNotFoundException - If file is not found at specified path.
*/
public static DriverConfigLoader loadFile(String path) throws FileNotFoundException {
URL url = loadSingleFile(path);
LOG.debug("Loaded Cassandra Driver config from path {}", path);
try {
DriverConfigLoader.fromUrl(url).getInitialConfig();
return DriverConfigLoader.fromUrl(url);
} catch (ConfigException.Parse parseException) {
LOG.error(
"Parsing error while parsing Cassandra Driver config from path {}", path, parseException);
throw parseException;
}
}

/**
* Load the Cassandra Config from a file as a {@link java.io.Serializable} {@link OptionsMap}.
* This {@link OptionsMap} can be stored in any object that needs to implement {@link
* java.io.Serializable}. At the time of opening a connection to Cassandra, it can be deserialized
* by {@link CassandraDriverConfigLoader#fromOptionsMap(OptionsMap)}. Note: Implementation Detail,
* Cassandra Driver does not provide a direct method to convert a link {@link DriverConfigLoader}
* into an {@link OptionsMap}, or build an {@link OptionsMap} from a file.
*
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
* @return DriverConfigLoader.
* @throws FileNotFoundException - If file is not found at specified path.
*/
public static OptionsMap getOptionsMapFromFile(String path) throws FileNotFoundException {
OptionsMap optionsMap = new OptionsMap();
DriverConfigLoader configLoader = loadFile(path);
configLoader
.getInitialConfig()
.getProfiles()
.forEach(
(profileName, profile) ->
profile.entrySet().forEach(e -> putInOptionsMap(optionsMap, profileName, e)));

return optionsMap;
}

/**
* Load the {@link DriverConfigLoader} from {@link java.io.Serializable} {@link OptionsMap} which
* was obtained as a part of {@link CassandraDriverConfigLoader#getOptionsMapFromFile(String)}.
*
* @param optionsMap
* @return DriverConfigLoader.
*/
public static DriverConfigLoader fromOptionsMap(OptionsMap optionsMap) {
return DriverConfigLoader.fromMap(optionsMap);
}

@VisibleForTesting
protected static URL loadSingleFile(String path) throws FileNotFoundException {
URL[] urls = JarFileReader.saveFilesLocally(path);
if (urls.length == 0) {
LOG.error("Could not load any Cassandra driver config file from specified path {}", path);
throw (new FileNotFoundException("No file found in path " + path));
}
if (urls.length > 1) {
LOG.error(
"Need to provide a single Cassandra driver config file in the specified path {}. Found {} ",
path,
urls);
throw (new IllegalArgumentException(
String.format(
"Need to provide a single Cassandra driver config file in the specified path %s. Found %d files",
path, urls.length)));
}
return urls[0];
}

@VisibleForTesting
protected static void putInOptionsMap(
OptionsMap optionsMap, String profileName, Entry<String, Object> e) {

TypedDriverOption option = OPTIONS_SUPPORTED_BY_DRIVER.get(e.getKey());
if (Objects.equal(option, null)) {
LOG.error(
"Unknown Cassandra Option {}, Options supported by driver = {}",
e.getKey(),
OPTIONS_SUPPORTED_BY_DRIVER);
throw new IllegalArgumentException(
String.format(
"Unknown Cassandra Driver Option %s. Supported Options = %s",
e.getKey(), OPTIONS_SUPPORTED_BY_DRIVER));
}
optionsMap.put(profileName, option, e.getValue());
}

private static ImmutableMap<String, TypedDriverOption> getOptionsSupportedByDriver() {
ImmutableMap.Builder<String, TypedDriverOption> mapBuilder = ImmutableMap.builder();
TypedDriverOption.builtInValues().forEach(e -> mapBuilder.put(e.getRawOption().getPath(), e));
return mapBuilder.build();
}

private CassandraDriverConfigLoader() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** TODO: rename this to FileReader. */
public class JarFileReader {

private static final Logger LOG = LoggerFactory.getLogger(JarFileReader.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTACT_POINTS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.RETRY_POLICY_CLASS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mockStatic;

import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.typesafe.config.ConfigException;
import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;

/** Test class for {@link CassandraDriverConfigLoader}. */
@RunWith(MockitoJUnitRunner.class)
public class CassandraDriverConfigLoaderTest {
MockedStatic mockFileReader;

@Before
public void initialize() {
mockFileReader = mockStatic(JarFileReader.class);
}

@Test
public void testCassandraDriverConfigLoaderBasic()
throws FileNotFoundException, MalformedURLException {
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
DriverConfigLoader driverConfigLoader = CassandraDriverConfigLoader.loadFile(testGcsPath);
assertThat(
driverConfigLoader
.getInitialConfig()
.getProfiles()
.get("default")
.getStringList(CONTACT_POINTS))
.isEqualTo(List.of("127.0.0.1:9042", "127.0.0.2:9042"));
;
assertThat(
driverConfigLoader
.getInitialConfig()
.getProfiles()
.get("default")
.getString(RETRY_POLICY_CLASS))
.isEqualTo("DefaultRetryPolicy");
}

@Test
public void testCassandraDriverConfigLoadError()
throws FileNotFoundException, MalformedURLException {
String testGcsPathNotFound = "gs://smt-test-bucket/cassandraConfigNotFound.conf";
String testGcsPathList =
"gs://smt-test-bucket/cassandraConfig1.conf,gs://smt-test-bucket/cassandraConfig2.conf";

URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPathNotFound))
.thenReturn(new URL[] {});
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPathList))
.thenReturn(
new URL[] {
Resources.getResource("test-cassandra-config.conf"),
Resources.getResource("test-cassandra-config.conf")
});
assertThrows(
FileNotFoundException.class,
() -> CassandraDriverConfigLoader.loadFile(testGcsPathNotFound));
assertThrows(
IllegalArgumentException.class,
() -> CassandraDriverConfigLoader.loadFile(testGcsPathList));
}

@Test
public void testCassandraDriverConfigParseError()
throws FileNotFoundException, MalformedURLException {
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
assertThrows(
ConfigException.Parse.class, () -> CassandraDriverConfigLoader.loadFile(testGcsPath));
}

@Test
public void testOptionsMapConversion() throws FileNotFoundException {

String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
DriverConfigLoader driverConfigLoaderDirect = CassandraDriverConfigLoader.loadFile(testGcsPath);
OptionsMap optionsMap = CassandraDriverConfigLoader.getOptionsMapFromFile(testGcsPath);
DriverConfigLoader driverConfigLoaderFromOptionsMap =
CassandraDriverConfigLoader.fromOptionsMap(optionsMap);
ImmutableMap<String, ImmutableMap<String, String>> directLoadMap =
driverConfigMap(driverConfigLoaderDirect);
ImmutableMap<String, ImmutableMap<String, String>> fromOptionsMap =
driverConfigMap(driverConfigLoaderFromOptionsMap);

assertThat(directLoadMap).isEqualTo(fromOptionsMap);

assertThrows(
IllegalArgumentException.class,
() -> {
OptionsMap optionsMapToLoad = new OptionsMap();
CassandraDriverConfigLoader.putInOptionsMap(
optionsMapToLoad, "default", new SimpleEntry<>("Unsupported", "Unsupported"));
});
}

private static ImmutableMap<String, ImmutableMap<String, String>> driverConfigMap(
DriverConfigLoader driverConfigLoaderDirect) {
ImmutableMap.Builder<String, ImmutableMap<String, String>> driverConfigMap =
ImmutableMap.builder();
driverConfigLoaderDirect
.getInitialConfig()
.getProfiles()
.forEach(
(profile, options) -> {
ImmutableMap.Builder<String, String> profileMapBuilder = ImmutableMap.builder();
options
.entrySet()
.forEach(
e -> profileMapBuilder.put(e.getKey().toString(), e.getValue().toString()));
driverConfigMap.put(profile, profileMapBuilder.build());
});
return driverConfigMap.build();
}

@After
public void cleanup() {
mockFileReader.close();
mockFileReader = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Configuration for the DataStax Java driver for Apache Cassandra®.
# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md.
# This file has an intentional parsing error, to help test exception handling for cases where the config file does not get parsed.
# DO NOT USE FOR PRODUCTION.

datastax-java-driver {
basic.contact-points = [ "127.0.0.1:9042", ]
}
}
Loading

0 comments on commit 177a218

Please sign in to comment.