indexes =
+ this.indexToCharacterCache.keySet().stream()
+ .sorted()
+ .collect(ImmutableList.toImmutableList());
+ for (int i = 0; i < indexes.size(); i++) {
+ if (indexes.get(i) != i) {
+ throw new IllegalStateException(
+ "Discontinuous index found at position "
+ + i
+ + " please check the collation order query"
+ + " for "
+ + collationReference()
+ + " for character set = "
+ + "index-type = "
+ + indexType());
+ }
+ if (!charToIndexCache.containsKey(indexToCharacterCache.get(indexes.get(i)))) {
+ throw new IllegalStateException(
+ "index which is not part of the character set found at position "
+ + i
+ + " index is "
+ + indexes.get(i)
+ + " index character is "
+ + indexToCharacterCache.get(indexes.get(i))
+ + " please check the collation order query"
+ + " for "
+ + collationReference()
+ + " for character set = "
+ + "index-type = "
+ + indexType());
+ }
+ if (charToIndexCache.get(indexToCharacterCache.get(indexes.get(i))) != indexes.get(i)) {
+ throw new IllegalStateException(
+ "index not mapping onto itself found at position "
+ + i
+ + " index is "
+ + indexes.get(i)
+ + " index character is "
+ + indexToCharacterCache.get(indexes.get(i))
+ + " index character is mapped to "
+ + charToIndexCache.get(indexToCharacterCache.get(indexes.get(i)))
+ + " please check the collation order query"
+ + " for "
+ + collationReference()
+ + " for character set = "
+ + "index-type = "
+ + indexType());
+ }
+ }
+ CollationIndex index = autoBuild();
+ logger.info(
+ "Initialized Index {} for {}, with {} characters, {} unique characters, and {} empty characters",
+ index.indexType(),
+ index.collationReference(),
+ index.characterToIndex().size(),
+ index.indexToCharacter().size());
+ return index;
+ }
+ }
+
+ public enum CollationIndexType {
+ /**
+ * An index that tracks the collation ordering for all positions, except for the trailing
+ * position for the specific case of a PAD SPACE comparison.
+ */
+ ALL_POSITIONS,
+ /**
+ * An index that tracks the collation ordering for trailing position in case of a PAD SPACE
+ * comparison.
+ */
+ TRAILING_POSITION_PAD_SPACE
+ }
+}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java
new file mode 100644
index 0000000000..24ef3e80b1
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper;
+
+import com.google.auto.value.AutoValue;
+import com.google.auto.value.extension.memoized.Memoized;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationIndex.CollationIndexType;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Map characters of a string read from the database to a {@link BigInteger} based on the collation
+ * ordering.
+ *
+ * Basic Requirement for mapping
+ *
+ *
Consider two strings read from the database - stringLeft and stringRight, and say {@link
+ * CollationMapper#mapString(String, int)} maps them to bigIntLeft and bigIntRight. Consider
+ * BigIntMean as the mean of BigIntLeft and BitIntRight, and stringSplit as the output of {@link
+ * CollationMapper#unMapString(BigInteger)} for BigIntMean, then,
+ *
+ *
{@code SELECT StringLeft <= StringSplit} and {@code SELECT StringRight >= StringSplit} must
+ * always hold true. Also split points of ranges of same column should never compare equal.
+ */
+@AutoValue
+public abstract class CollationMapper implements Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CollationMapper.class);
+
+ /** Details about the collation. */
+ public abstract CollationReference collationReference();
+
+ /**
+ * Map of character to it's index position based on collation order. Helps us map a string to big
+ * integer for all positions. {@link #allPositionsIndex()} is the primarily referred index except
+ * for the case of trailing position in a non-pad comparison.
+ */
+ public abstract CollationIndex allPositionsIndex();
+
+ /**
+ * Map of character to it's index position based on collation order. Helps us map a string to big
+ * integer for trailing position in case a pad-space comparison is needed. Pad Space comparison is
+ * needed in case of a PAD Space collation in MYSQL, or a CHAR column in PG.
+ *
+ *
{@link #trailingPositionsPadSpace()} is referred only for trailing position in a non-pad
+ * comparison.
+ *
+ * @return
+ */
+ public abstract CollationIndex trailingPositionsPadSpace();
+
+ /**
+ * Empty Characters. MySQL ignores empty characters in comparisons. For example consider the below
+ * query that adds a control-z between a and b. {@code SELECT CONCAT('a', CONVERT(UNHEX('001A')
+ * using utf8mb4), 'b') = 'ab' COLLATE ;} returns 1. TODO(vardhanvthigle): Check this
+ * behavior for PG and other databases.
+ */
+ public abstract ImmutableSet emptyCharacters();
+
+ /**
+ * Space Characters. MySQL ignores trailing space characters in comparisons for PAD space
+ * collations and PG has the same behavior for CHAR columns. Note that there are various
+ * codepoints that can potentially represent a space, like the ascii space or non-breaking space
+ * (UNHEX(C2H0)) when the collation is Pad Space. These have same behavior to ascii space as far
+ * as trailing or non-trailing comparison is concerned.
+ */
+ public abstract ImmutableSet spaceCharacters();
+
+ @Memoized
+ String allSpaceCharacters() {
+ return this.spaceCharacters().stream().map(String::valueOf).collect(Collectors.joining(""));
+ }
+
+ @Memoized
+ String emptyReplacePattern() {
+ if (this.emptyCharacters().isEmpty()) {
+ return "";
+ }
+ return "["
+ + Pattern.quote(
+ this.emptyCharacters().stream().map(String::valueOf).collect(Collectors.joining("")))
+ + "]";
+ }
+
+ /**
+ * Map a {@link String} to {@link BigInteger}.
+ *
+ * @param element String.
+ * @param lengthToPad maximum length of the string as per the column width.
+ * @return mapped big integer.
+ * Note:
+ *
The logic for mapping th string has to take care of various database nuances like:
+ *
+ * - Control Characters like control-z are ignored for comparisons at all positions.
+ *
- Space is ignored for comparison at trailing positions. Depending on the
+ * character-set, there could be more than one character that represents a space like EM quad, non-breaking space.
+ *
- Space is not ignored at non-trailing positions in the comparison and characters like
+ * tab or new-line could compare less then space at non-trailing positions.
+ *
+ */
+ public BigInteger mapString(@Nullable String element, int lengthToPad) {
+
+ if (element == null) {
+ return BigInteger.valueOf(-1);
+ }
+ BigInteger ret = BigInteger.ZERO;
+
+ // MySQL ignores empty character in string comparisons.
+ // For example (adding control-z between a and b):
+ // SELECT CONCAT('a', CONVERT(UNHEX('001A') using utf8mb4), 'b') = 'ab' COLLATE
+ // utf8mb4_0900_ai_ci;
+ // returns 1.
+ // Remove all the empty characters.
+ element = element.replaceAll(emptyReplacePattern(), "");
+
+ // Remove trailing spaces for padSpace comparison.
+ if (this.collationReference().padSpace()) {
+ element = StringUtils.stripEnd(element, allSpaceCharacters());
+ }
+ if (element.isEmpty()) {
+ return BigInteger.valueOf(-1);
+ }
+
+ // Convert the string to BigInteger.
+ for (int index = 0; index < element.length(); index++) {
+ Character c = element.charAt(index);
+ ret =
+ ret.multiply(BigInteger.valueOf(getCharsetSize(index == (element.length() - 1))))
+ .add(BigInteger.valueOf(getOrdinalPosition(c, index == (element.length() - 1))));
+ }
+ for (int index = element.length(); index < lengthToPad; index++) {
+ ret = ret.multiply(BigInteger.valueOf(getCharsetSize(index == (element.length() - 1))));
+ }
+ return ret;
+ }
+
+ /**
+ * Unmap a {@link BigInteger} back to {@link String}.
+ *
+ * @param element BigInteger to unmap.
+ * @return mapped String.
+ * Note:
+ *
The logic for mapping th string has to take care of various database nuances like:
+ *
+ * - Control Characters like control-z are ignored for comparisons at all positions.
+ *
- Space is ignored for comparison at trailing positions. Depending on the
+ * character-set, there could be more than one character that represents a space like EM quad, non-breaking space.
+ *
- Space is not ignored at non-trailing positions in the comparison and characters like
+ * tab or new-line could compare less then space at non-trailing positions.
+ *
+ */
+ public String unMapString(BigInteger element) {
+ StringBuilder word = new StringBuilder();
+ int index = 0;
+
+ if (element.equals(BigInteger.valueOf(-1))) {
+ return "";
+ }
+
+ // Base Case that the string just represents single character
+ if (element == BigInteger.ZERO) {
+ char c = getCharacterFromPosition(element.longValue(), true);
+ return String.valueOf(c);
+ }
+
+ while (element != BigInteger.ZERO) {
+ long charsetSize = getCharsetSize(index == 0);
+
+ BigInteger reminder = element.mod(BigInteger.valueOf(charsetSize));
+ char c = getCharacterFromPosition(reminder.longValue(), (index == 0));
+ word.append(c);
+
+ element = element.divide(BigInteger.valueOf(charsetSize));
+ index++;
+ }
+ String ret = word.reverse().toString();
+ return ret;
+ }
+
+ public static Builder builder(CollationReference collationReference) {
+ Builder builder =
+ new AutoValue_CollationMapper.Builder().setCollationReference(collationReference);
+
+ builder
+ .allPositionsIndexBuilder()
+ .setIndexType(CollationIndexType.ALL_POSITIONS)
+ .setCollationReference(collationReference);
+ builder
+ .trailingPositionsPadSpaceBuilder()
+ .setIndexType(CollationIndexType.TRAILING_POSITION_PAD_SPACE)
+ .setCollationReference(collationReference);
+ return builder;
+ }
+
+ public static CollationMapper fromDB(
+ Connection connection,
+ UniformSplitterDBAdapter dbAdapter,
+ CollationReference collationReference)
+ throws SQLException {
+ String query =
+ dbAdapter.getCollationsOrderQuery(
+ collationReference.dbCharacterSet(), collationReference.dbCollation());
+ CollationMapper mapper = null;
+ try (Statement statement = connection.createStatement()) {
+ statement.setEscapeProcessing(false);
+ // Due to https://bugs.mysql.com/bug.php?id=108195 affecting the version of connector,
+ // we can't use executeQuery for a multi line complex query.
+ boolean foundResultSet = statement.execute(query);
+ // The recommended workaround is a while(true) loop, we limit the iterations to number of
+ // lines in the query (as it's impossible to have more resultsets than that as hangs are not
+ // easy to debug in dataflow)
+ for (int i = 0; i < query.lines().count() + 1; i++) {
+ if (foundResultSet) {
+ ResultSet rs = statement.getResultSet();
+ mapper = fromResultSet(rs, collationReference);
+ break;
+ }
+ foundResultSet = statement.getMoreResults();
+ if (!foundResultSet && statement.getUpdateCount() == -1) {
+ Preconditions.checkState(
+ false, "No result sets found while querying collation for " + collationReference);
+ }
+ }
+ } catch (SQLException e) {
+ // Beam will auto retry the exceptions in run time.
+ logger.error(
+ "Exception while getting collation order for {}, exception = {}, query = {}",
+ collationReference,
+ e,
+ query);
+ throw e;
+ }
+ if (mapper == null) {
+ Preconditions.checkState(
+ false, "No result sets found while querying collation for " + collationReference);
+ }
+ return mapper;
+ }
+
+ private long getCharsetSize(boolean lastCharacter) {
+ return (lastCharacter && collationReference().padSpace())
+ ? this.trailingPositionsPadSpace().getCharsetSize()
+ : this.allPositionsIndex().getCharsetSize();
+ }
+
+ private long getOrdinalPosition(Character c, boolean lastCharacter) {
+ return (lastCharacter && collationReference().padSpace())
+ ? this.trailingPositionsPadSpace().getOrdinalPosition(c)
+ : this.allPositionsIndex().getOrdinalPosition(c);
+ }
+
+ private Character getCharacterFromPosition(long ordinalPosition, boolean firstIteration) {
+ return (firstIteration && collationReference().padSpace())
+ ? this.trailingPositionsPadSpace().getCharacterFromPosition(ordinalPosition)
+ : this.allPositionsIndex().getCharacterFromPosition(ordinalPosition);
+ }
+
+ private static CollationMapper fromResultSet(ResultSet rs, CollationReference collationReference)
+ throws SQLException {
+ Builder builder = builder(collationReference);
+ while (rs.next()) {
+ builder.addCharacter(CollationOrderRow.fromRS(rs));
+ }
+ return builder.build();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ abstract Builder setCollationReference(CollationReference collationReference);
+
+ abstract CollationReference collationReference();
+
+ abstract CollationIndex.Builder allPositionsIndexBuilder();
+
+ abstract CollationIndex.Builder trailingPositionsPadSpaceBuilder();
+
+ abstract ImmutableSet.Builder emptyCharactersBuilder();
+
+ abstract ImmutableSet.Builder spaceCharactersBuilder();
+
+ public Builder addCharacter(CollationOrderRow collationOrderRow) {
+
+ logger.debug(
+ "Registering character order for {}, character-details = {}",
+ collationReference(),
+ collationOrderRow);
+ if (collationOrderRow.isEmpty()) {
+ emptyCharactersBuilder().add(collationOrderRow.charsetChar());
+ return this;
+ }
+ if (collationOrderRow.isSpace()) {
+ spaceCharactersBuilder().add(collationOrderRow.charsetChar());
+ }
+ allPositionsIndexBuilder()
+ .addCharacter(
+ collationOrderRow.charsetChar(),
+ collationOrderRow.equivalentChar(),
+ collationOrderRow.codepointRank());
+ if (!collationOrderRow.isSpace()) {
+ trailingPositionsPadSpaceBuilder()
+ .addCharacter(
+ collationOrderRow.charsetChar(),
+ collationOrderRow.equivalentCharPadSpace(),
+ collationOrderRow.codepointRankPadSpace());
+ }
+ return this;
+ }
+
+ abstract CollationMapper autoBuild();
+
+ public CollationMapper build() {
+ return autoBuild();
+ }
+ }
+}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationOrderRow.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationOrderRow.java
new file mode 100644
index 0000000000..f903978114
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationOrderRow.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper;
+
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.CHARSET_CHAR_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.CODEPOINT_RANK_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.CODEPOINT_RANK_PAD_SPACE_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.EQUIVALENT_CHARSET_CHAR_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.EQUIVALENT_CHARSET_CHAR_PAD_SPACE_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.IS_EMPTY_COL;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationOrderRow.CollationsOrderQueryColumns.IS_SPACE_COL;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Preconditions;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a row of the minimum required columns from the collations output query. You can refer
+ * to src/test/resources/TestCollations/collation-output-mysql-utf8mb4-0900-ai-ci.tsv for example of
+ * the columns.
+ */
+@AutoValue
+public abstract class CollationOrderRow {
+
+ private static final Logger logger = LoggerFactory.getLogger(CollationOrderRow.class);
+
+ /** Character in the character set. */
+ public abstract Character charsetChar();
+
+ /** A character with lowest rank charset_char character is equal to as per the collation. */
+ public abstract Character equivalentChar();
+
+ /** 0 offset rank of this character as per the collation sort ordering at all positions. */
+ public abstract Long codepointRank();
+
+ /**
+ * A character with lowest rank charset_char character is equal to as per the collation at
+ * trailing position, in case a PAD SPACE comparison is needed. Unless you are looking at space
+ * like characters, this will be exactly same as equivalent_character.
+ */
+ public abstract Character equivalentCharPadSpace();
+
+ /**
+ * A character with lowest rank charset_char character is equal to as per the collation at
+ * trailing position, in case a PAD SPACE comparison is needed. Unless you are looking at space
+ * like characters, this will be exactly same as equivalent_character.
+ */
+ public abstract Long codepointRankPadSpace();
+
+ /**
+ * A boolean columns, true if the character is equal to '\0' at all positions of comparison. False
+ * otherwise.
+ */
+ public abstract Boolean isEmpty();
+
+ /**
+ * A boolean columns, true if the character is equal to ' ' at all positions of comparison. False
+ * otherwise.
+ */
+ public abstract Boolean isSpace();
+
+ public static Builder builder() {
+ return new AutoValue_CollationOrderRow.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ /**
+ * Construct a {@link CollationOrderRow} from a result set for the collation order query. It is
+ * expected that the caller handlers iteration of resultSet via {@link ResultSet#next()} and
+ * exceptions if any.
+ *
+ * @param rs
+ * @return fields of the output enclosed in {@link CollationOrderRow}.
+ * @throws SQLException if thrown by the {@link ResultSet ResultSet api}.
+ */
+ public static CollationOrderRow fromRS(ResultSet rs) throws SQLException {
+
+ String charSetChar = rs.getString(CHARSET_CHAR_COL);
+ String equivalentCharsetChar = rs.getString(EQUIVALENT_CHARSET_CHAR_COL);
+ Long codePointRank = rs.getLong(CODEPOINT_RANK_COL);
+ Boolean isEmpty = rs.getBoolean(IS_EMPTY_COL);
+ Boolean isSpace = rs.getBoolean(IS_SPACE_COL);
+ String equivalentCharsetCharPadSpace = rs.getString(EQUIVALENT_CHARSET_CHAR_PAD_SPACE_COL);
+ Long codePointRankPadSpace = rs.getLong(CODEPOINT_RANK_PAD_SPACE_COL);
+
+ logger.debug(
+ "Going to register collation query row charSetChar = {} with length = {}, equivalentCharSetChar = {} with length - {}, codePointRank = {}, isEmpty = {} isSpace = {}",
+ charSetChar,
+ charSetChar.length(),
+ equivalentCharsetChar,
+ equivalentCharsetChar.length(),
+ codePointRank,
+ isEmpty,
+ isSpace);
+
+ Preconditions.checkArgument(
+ charSetChar.length() <= 1, "Found a long character in collation output " + charSetChar);
+ Preconditions.checkArgument(
+ equivalentCharsetChar.length() <= 1,
+ "Found a long equivalent character in collation output " + equivalentCharsetChar);
+ Preconditions.checkArgument(
+ equivalentCharsetCharPadSpace.length() <= 1,
+ "Found a long equivalent character for pad space in collation output "
+ + equivalentCharsetChar);
+
+ return CollationOrderRow.builder()
+ .setCharsetChar(charSetChar.charAt(0))
+ .setEquivalentChar(equivalentCharsetChar.charAt(0))
+ .setCodepointRank(codePointRank)
+ .setEquivalentCharPadSpace(equivalentCharsetCharPadSpace.charAt(0))
+ .setCodepointRankPadSpace(codePointRankPadSpace)
+ .setIsEmpty(isEmpty)
+ .setIsSpace(isSpace)
+ .build();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setCharsetChar(Character value);
+
+ public abstract Builder setEquivalentChar(Character value);
+
+ public abstract Builder setCodepointRank(Long value);
+
+ public abstract Builder setEquivalentCharPadSpace(Character value);
+
+ public abstract Builder setCodepointRankPadSpace(Long value);
+
+ public abstract Builder setIsEmpty(Boolean value);
+
+ public abstract Builder setIsSpace(Boolean value);
+
+ public abstract CollationOrderRow build();
+ }
+
+ /** Column names that must be returned by the collations order query. */
+ public static class CollationsOrderQueryColumns {
+
+ /** Character in the character set. */
+ public static final String CHARSET_CHAR_COL = "charset_char";
+
+ /** A character with lowest rank charset_char character is equal to as per the collation. */
+ public static final String EQUIVALENT_CHARSET_CHAR_COL = "equivalent_charset_char";
+
+ /**
+ * A boolean columns, true if the character is equal to '\0' at all positions of comparison.
+ * False otherwise.
+ */
+ public static final String IS_EMPTY_COL = "is_empty";
+
+ /**
+ * A boolean columns, true if the character is equal to ' ' at all positions of comparison.
+ * False otherwise.
+ */
+ public static final String IS_SPACE_COL = "is_space";
+
+ /** 0 offset rank of this character as per the collation sort ordering at all positions. */
+ public static final String CODEPOINT_RANK_COL = "codepoint_rank";
+
+ /**
+ * A character with lowest rank charset_char character is equal to as per the collation at
+ * trailing position, in case a PAD SPACE comparison is needed. Unless you are looking at space
+ * like characters, this will be exactly same as equivalent_character.
+ */
+ public static final String EQUIVALENT_CHARSET_CHAR_PAD_SPACE_COL =
+ "equivalent_charset_char_pad_space";
+
+ /**
+ * 0 offset rank of this character as per the collation sort ordering at trailing position in
+ * case a PAD SPACE comparison is needed.
+ */
+ public static final String CODEPOINT_RANK_PAD_SPACE_COL = "codepoint_rank_pad_space";
+
+ private CollationsOrderQueryColumns() {}
+ }
+}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationReference.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationReference.java
new file mode 100644
index 0000000000..1bd625f101
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationReference.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+
+/** Reference to a collation. */
+@AutoValue
+public abstract class CollationReference implements Serializable {
+
+ /** Character set of the column. */
+ public abstract String dbCharacterSet();
+
+ /** Collation of the column. */
+ public abstract String dbCollation();
+
+ /**
+ * For MySql, set to true if collation is of the type PAD SPACE. For PG, set to true if column is
+ * of CHAR type.
+ */
+ public abstract Boolean padSpace();
+
+ public static Builder builder() {
+ return new AutoValue_CollationReference.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setDbCharacterSet(String value);
+
+ public abstract Builder setDbCollation(String value);
+
+ public abstract Builder setPadSpace(Boolean value);
+
+ public abstract CollationReference build();
+ }
+}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/package-info.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/package-info.java
new file mode 100644
index 0000000000..ae2ae5176a
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/package-info.java
@@ -0,0 +1 @@
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper;
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java
new file mode 100644
index 0000000000..44e96b523e
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperDoFn.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import javax.annotation.Nullable;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Discover the Collation Mapping information for a given {@link CollationReference}. */
+public class CollationMapperDoFn
+ extends DoFn>
+ implements Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CollationMapper.class);
+ private final SerializableFunction dataSourceProviderFn;
+ private final UniformSplitterDBAdapter dbAdapter;
+
+ @JsonIgnore private transient @Nullable DataSource dataSource;
+
+ public CollationMapperDoFn(
+ SerializableFunction dataSourceProviderFn,
+ UniformSplitterDBAdapter dbAdapter) {
+ this.dataSourceProviderFn = dataSourceProviderFn;
+ this.dbAdapter = dbAdapter;
+ this.dataSource = null;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ dataSource = dataSourceProviderFn.apply(null);
+ }
+
+ private Connection acquireConnection() throws SQLException {
+ return checkStateNotNull(this.dataSource).getConnection();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element CollationReference input,
+ OutputReceiver> out)
+ throws SQLException {
+
+ try (Connection conn = acquireConnection()) {
+ CollationMapper mapper = CollationMapper.fromDB(conn, dbAdapter, input);
+ out.output(KV.of(input, mapper));
+ } catch (Exception e) {
+ logger.error(
+ "Exception: {} while generating collationMapper for dataSource: {}, collationReference: {}",
+ e,
+ dataSource,
+ input);
+ // Beam will re-try exceptions generation during pipeline-run phase.
+ throw e;
+ }
+ }
+}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java
new file mode 100644
index 0000000000..96424207c8
--- /dev/null
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/CollationMapperTransform.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.UniformSplitterDBAdapter;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Generate the Side-Input that encodes the Collation Mapping information for given instance of
+ * {@link ReadWithUniformPartitions}.
+ */
+@AutoValue
+public abstract class CollationMapperTransform
+ extends PTransform>>
+ implements Serializable {
+
+ /** List of {@link CollationReference} to discover the mapping for. */
+ public abstract ImmutableList collationReferences();
+
+ /** Provider for connection pool. */
+ public abstract SerializableFunction dataSourceProviderFn();
+
+ /** Provider to dialect specific Collation mapping query. */
+ public abstract UniformSplitterDBAdapter dbAdapter();
+
+ /**
+ * Generate the Side-Input that encodes the Collation Mapping information for given instance of
+ * {@link ReadWithUniformPartitions}.
+ *
+ * @param input PBegin
+ * @return {@link PCollectionView} for discovered {@link CollationReference}, {@link
+ * CollationMapper} pairs.
+ */
+ @Override
+ public PCollectionView