Skip to content

Commit

Permalink
Adding support for bigInt Unsigned Primary Key and connectionProperti…
Browse files Browse the repository at this point in the history
…es to shard-config. (#1877)
  • Loading branch information
VardhanThigle authored Sep 23, 2024
1 parent 2e0b521 commit 3b087bc
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
tables,
sourceDbURL,
null,
null,
0,
username,
password,
Expand All @@ -78,6 +79,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
List<String> tables,
String sourceDbURL,
String host,
String connectionProperties,
int port,
String username,
String password,
Expand Down Expand Up @@ -108,6 +110,9 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
case MYSQL:
if (sourceDbURL == null) {
sourceDbURL = "jdbc:mysql://" + host + ":" + port + "/" + dbName;
if (StringUtils.isNotBlank(connectionProperties)) {
sourceDbURL = sourceDbURL + "?" + connectionProperties;
}
}
for (Entry<String, String> entry :
MySqlConfigDefaults.DEFAULT_MYSQL_URL_PROPERTIES.entrySet()) {
Expand All @@ -118,6 +123,9 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
if (sourceDbURL == null) {
sourceDbURL = "jdbc:postgresql://" + host + ":" + port + "/" + dbName;
}
if (StringUtils.isNotBlank(connectionProperties)) {
sourceDbURL = sourceDbURL + "?" + connectionProperties;
}
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ private ImmutableMap<String, SourceColumnType> getTableCols(

private static final ImmutableMap<String, SourceColumnIndexInfo.IndexType> INDEX_TYPE_MAPPING =
ImmutableMap.<String, SourceColumnIndexInfo.IndexType>builder()
.put("BIGINT UNSIGNED", IndexType.BIG_INT_UNSIGNED)
.put("BIGINT", IndexType.NUMERIC)
.put("DATETIME", IndexType.DATE_TIME)
.put("INTEGER", IndexType.NUMERIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper;

import static com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo.INDEX_TYPE_TO_CLASS;

import com.google.cloud.teleport.v2.source.reader.io.IoWrapper;
import com.google.cloud.teleport.v2.source.reader.io.exception.SuitableIndexNotFoundException;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.JdbcIOWrapperConfig;
Expand Down Expand Up @@ -243,7 +245,7 @@ private static TableConfig getTableConfig(
.forEach(tableConfigBuilder::withPartitionColum);
} else {
ImmutableSet<IndexType> supportedIndexTypes =
ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING);
ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED);
// As of now only Primary key index with Numeric type is supported.
// TODO:
// 1. support non-primary unique indexes.
Expand Down Expand Up @@ -274,11 +276,21 @@ private static TableConfig getTableConfig(
}
}

@VisibleForTesting
protected static java.lang.Class indexTypeToColumnClass(SourceColumnIndexInfo indexInfo)
throws SuitableIndexNotFoundException {
if (INDEX_TYPE_TO_CLASS.containsKey(indexInfo.indexType())) {
return INDEX_TYPE_TO_CLASS.get(indexInfo.indexType());
} else {
throw new SuitableIndexNotFoundException(
new Throwable("No class Mapping for IndexType " + indexInfo));
}
}

private static PartitionColumn partitionColumnFromIndexInfo(SourceColumnIndexInfo idxInfo) {
return PartitionColumn.builder()
.setColumnName(idxInfo.columnName())
// TODO(vardhanvthigle): handle other types
.setColumnClass((idxInfo.indexType() == IndexType.NUMERIC) ? Long.class : String.class)
.setColumnClass(indexTypeToColumnClass(idxInfo))
.setStringCollation(idxInfo.collationReference())
.setStringMaxLength(idxInfo.stringMaxLength())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand All @@ -35,7 +36,11 @@ public class BoundaryExtractorFactory {
(BoundaryExtractor<Long>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
String.class, (BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings);
String.class, (BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
BigInteger.class,
(BoundaryExtractor<BigInteger>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBigIntegers(partitionColumn, resultSet, boundaryTypeMapper));

/**
* Create a {@link BoundaryExtractor} for the required class.
Expand Down Expand Up @@ -84,6 +89,22 @@ private static Boundary<Long> fromLongs(
.build();
}

private static Boundary<java.math.BigInteger> fromBigIntegers(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigInteger.class));
resultSet.next();
return Boundary.<java.math.BigInteger>builder()
.setPartitionColumn(partitionColumn)
.setStart(resultSet.getBigDecimal(1).toBigInteger())
.setEnd(resultSet.getBigDecimal(2).toBigInteger())
.setBoundarySplitter(BoundarySplitterFactory.create(BigInteger.class))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}

private static Boundary<String> fromStrings(
PartitionColumn partitionColumn,
ResultSet resultSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.math.BigInteger;
import javax.annotation.Nullable;

@AutoValue
Expand Down Expand Up @@ -135,8 +137,16 @@ public SourceColumnIndexInfo build() {

public enum IndexType {
NUMERIC,
BIG_INT_UNSIGNED,
STRING,
DATE_TIME,
OTHER
};

// TODO(vardhanvthigle): handle other types
public static final ImmutableMap<IndexType, Class> INDEX_TYPE_TO_CLASS =
ImmutableMap.of(
IndexType.NUMERIC, Long.class,
IndexType.STRING, String.class,
IndexType.BIG_INT_UNSIGNED, BigInteger.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ static PipelineResult executeShardedMigration(
List.of(srcTable),
null,
shard.getHost(),
shard.getConnectionProperties(),
Integer.parseInt(shard.getPort()),
shard.getUserName(),
shard.getPassword(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ public void testConfigWithMySqlDefaultsFromOptions() {
public void testConfigWithMySqlUrlFromOptions() {
PCollection<Integer> dummyPCollection = pipeline.apply(Create.of(1));
pipeline.run();
JdbcIOWrapperConfig config =
JdbcIOWrapperConfig configWithConnectionProperties =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.MYSQL,
List.of("table1", "table2"),
null,
"myhost",
"testParam=testValue",
3306,
"myuser",
"mypassword",
Expand All @@ -88,7 +89,29 @@ public void testConfigWithMySqlUrlFromOptions() {
10,
0,
Wait.on(dummyPCollection));
assertThat(config.sourceDbURL())

JdbcIOWrapperConfig configWithoutConnectionProperties =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.MYSQL,
List.of("table1", "table2"),
null,
"myhost",
null,
3306,
"myuser",
"mypassword",
"mydb",
null,
"com.mysql.jdbc.Driver",
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));

assertThat(configWithConnectionProperties.sourceDbURL())
.isEqualTo(
"jdbc:mysql://myhost:3306/mydb?testParam=testValue&allowMultiQueries=true&autoReconnect=true&maxReconnects=10");
assertThat(configWithoutConnectionProperties.sourceDbURL())
.isEqualTo(
"jdbc:mysql://myhost:3306/mydb?allowMultiQueries=true&autoReconnect=true&maxReconnects=10");
}
Expand Down Expand Up @@ -130,12 +153,30 @@ public void testConfigWithPostgreSQLDefaultsFromOptions() {
public void testConfigWithPostgreSqlUrlFromOptions() {
PCollection<Integer> dummyPCollection = pipeline.apply(Create.of(1));
pipeline.run();
JdbcIOWrapperConfig config =
JdbcIOWrapperConfig configWithConnectionParameters =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.POSTGRESQL,
List.of("table1", "table2"),
null,
"myhost",
"testParam=testValue",
5432,
"myuser",
"mypassword",
"mydb",
null,
"com.mysql.jdbc.Driver",
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
JdbcIOWrapperConfig configWithoutConnectionParameters =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.POSTGRESQL,
List.of("table1", "table2"),
null,
"myhost",
"",
5432,
"myuser",
"mypassword",
Expand All @@ -146,7 +187,10 @@ public void testConfigWithPostgreSqlUrlFromOptions() {
10,
0,
Wait.on(dummyPCollection));
assertThat(config.sourceDbURL()).isEqualTo("jdbc:postgresql://myhost:5432/mydb");
assertThat(configWithoutConnectionParameters.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb");
assertThat(configWithConnectionParameters.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb?testParam=testValue");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.math.BigInteger;
import java.sql.SQLException;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -309,4 +310,34 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
jdbcIOWrapperWithFeatureEnabled.getTableReaders().values().stream().findFirst().get())
.isInstanceOf(ReadWithUniformPartitions.class);
}

@Test
public void testIndexTypeToColumnClass() {

assertThat(
JdbcIoWrapper.indexTypeToColumnClass(
SourceColumnIndexInfo.builder()
.setColumnName("col1")
.setIndexType(IndexType.BIG_INT_UNSIGNED)
.setOrdinalPosition(1)
.setIndexName("PRIMARY")
.setIsPrimary(true)
.setCardinality(42L)
.setIsUnique(true)
.build()))
.isEqualTo(BigInteger.class);
assertThrows(
SuitableIndexNotFoundException.class,
() ->
JdbcIoWrapper.indexTypeToColumnClass(
SourceColumnIndexInfo.builder()
.setColumnName("col1")
.setIndexType(IndexType.OTHER)
.setOrdinalPosition(1)
.setIndexName("PRIMARY")
.setIsPrimary(true)
.setCardinality(42L)
.setIsUnique(true)
.build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -88,6 +89,32 @@ public void testFromIntegers() throws SQLException {
null));
}

@Test
public void testFromBigIntegers() throws SQLException {
final BigInteger unsignedBigIntMax = new BigInteger("18446744073709551615");
PartitionColumn partitionColumn =
PartitionColumn.builder().setColumnName("col1").setColumnClass(BigInteger.class).build();
BoundaryExtractor<BigInteger> extractor = BoundaryExtractorFactory.create(BigInteger.class);
when(mockResultSet.next()).thenReturn(true);
when(mockResultSet.getBigDecimal(1)).thenReturn(new BigDecimal(BigInteger.ZERO));
// BigInt Unsigned Max in MySQL
when(mockResultSet.getBigDecimal(2)).thenReturn(new BigDecimal(unsignedBigIntMax));
Boundary<BigInteger> boundary = extractor.getBoundary(partitionColumn, mockResultSet, null);

assertThat(boundary.start()).isEqualTo(BigInteger.ZERO);
assertThat(boundary.end()).isEqualTo(unsignedBigIntMax);
assertThat(boundary.split(null).getLeft().end())
.isEqualTo((unsignedBigIntMax.divide(BigInteger.TWO)));
// Mismatched Type
assertThrows(
IllegalArgumentException.class,
() ->
extractor.getBoundary(
PartitionColumn.builder().setColumnName("col1").setColumnClass(long.class).build(),
mockResultSet,
null));
}

@Test
public void testFromStrings() throws SQLException {
PartitionColumn partitionColumn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ private Map<String, List<Map<String, Object>>> getExpectedData() {
expectedData.put("set", createRows("set", "v1,v2", "NULL"));
expectedData.put(
"integer_unsigned", createRows("integer_unsigned", "0", "42", "4294967296", "NULL"));
expectedData.put(
"bigint_unsigned_pk", createRows("bigint_unsigned", "0", "42", "18446744073709551615"));
expectedData.put("string_pk", createRows("string", "Cloud", "Google", "Spanner"));
return expectedData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ CREATE TABLE set_table (
set_col SET('v1', 'v2', 'v3') DEFAULT NULL
);


CREATE TABLE `bigint_unsigned_pk_table` (
`id` BIGINT UNSIGNED PRIMARY KEY,
`bigint_unsigned_col` BIGINT UNSIGNED NOT NULL
);

CREATE TABLE `string_pk_table` (
`id` STRING(50) PRIMARY KEY,
`string_col` STRING(50) NOT NULL
);

ALTER TABLE `bigint_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `bigint_unsigned_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `binary_table` MODIFY `id` INT AUTO_INCREMENT;
Expand Down Expand Up @@ -297,6 +308,8 @@ INSERT INTO `year_table` (`year_col`) VALUES (2022);
INSERT INTO `year_table` (`year_col`) VALUES (1901);
INSERT INTO `year_table` (`year_col`) VALUES (2155);
INSERT INTO `set_table` (`set_col`) VALUES ('v1,v2');
INSERT INTO `bigint_unsigned_pk_table` (`id`, `bigint_unsigned_col`) VALUES ('0', '0'), ('42', '42'), ('18446744073709551615', '18446744073709551615');
INSERT INTO `string_pk_table` (`id`, `string_col`) VALUES ('Cloud', 'Cloud'), ('Google', 'Google'), ('Spanner','Spanner');

INSERT INTO `bigint_table` (`bigint_col`) VALUES (NULL);
INSERT INTO `bigint_unsigned_table` (`bigint_unsigned_col`) VALUES (NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,13 @@ CREATE TABLE spatial_point (
CREATE TABLE spatial_polygon (
id INT64 NOT NULL,
area STRING(MAX),
) PRIMARY KEY(id);
) PRIMARY KEY(id);
CREATE TABLE `bigint_unsigned_pk_table` (
id NUMERIC NOT NULL,
bigint_unsigned_col NUMERIC NOT NULL,
) PRIMARY KEY(id);

CREATE TABLE `string_pk_table` (
id STRING(max) NOT NULL,
string_col STRING(max) NOT NULL
) PRIMARY KEY(id);
Loading

0 comments on commit 3b087bc

Please sign in to comment.