Skip to content

Commit

Permalink
Spanner change streams to BigQuery Dataflow template: Respect the pri…
Browse files Browse the repository at this point in the history
…mary key ordinal position when doing stale read for UPDATE mod

PiperOrigin-RevId: 458555061
  • Loading branch information
cloud-teleport committed Jul 1, 2022
1 parent 83e6c1d commit 67d0303
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,31 @@
import java.io.Serializable;

/**
* The {@link TrackedSpannerColumn} contains the name, type and ordinal position of a Spanner column
* tracked by a change stream.
* The {@link TrackedSpannerColumn} contains the name, type and ordinal positions of a Spanner
* column tracked by a change stream.
*/
@AutoValue
public abstract class TrackedSpannerColumn implements Serializable {

public static TrackedSpannerColumn create(String name, Type type, int ordinalPosition) {
return new AutoValue_TrackedSpannerColumn(name, type, ordinalPosition);
public static TrackedSpannerColumn create(
String name, Type type, int ordinalPosition, int pkOrdinalPosition) {
return new AutoValue_TrackedSpannerColumn(name, type, ordinalPosition, pkOrdinalPosition);
}

public abstract String getName();

public abstract Type getType();

public abstract int getOrdinalPosition();

// The ordinal position of the primary key, this is set to -1 for non-primary key.
// Why is ordinal position of the primary key necessary? Consider the following table schema:
// CREATE TABLE Foo (A INT64, B INT64) PRIMARY KEY (B, A).
// 1) The ordinal positions of all columns from INFORMATION_SCHEMA.COLUMNS tables are: A = 1, B =
// 2.
// 2) The ordinal positions of primary key columns from INFORMATION_SCHEMA.KEY_COLUMN_USAGE tables
// are: A = 2, B = 1.
// Since we need to set primary key columns when we make Spanner stale read, we cannot simply rely
// on ordinal positions from #1, since #1 and #2 are different.
public abstract int getPkOrdinalPosition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public final class TrackedSpannerTable implements Serializable {
/** Default constructor for serialization only. */
public TrackedSpannerTable() {}

private static class SortByPkOrdinalPosition implements Comparator<TrackedSpannerColumn> {
public int compare(TrackedSpannerColumn o1, TrackedSpannerColumn o2) {
return Integer.compare(o1.getPkOrdinalPosition(), o2.getPkOrdinalPosition());
}
}

private static class SortByOrdinalPosition implements Comparator<TrackedSpannerColumn> {
public int compare(TrackedSpannerColumn o1, TrackedSpannerColumn o2) {
return Integer.compare(o1.getOrdinalPosition(), o2.getOrdinalPosition());
Expand All @@ -53,7 +59,8 @@ public TrackedSpannerTable(
List<TrackedSpannerColumn> nonPkColumns) {
this.pkColumns = new ArrayList<>(pkColumns);
this.nonPkColumns = new ArrayList<>(nonPkColumns);
Collections.sort(this.pkColumns, new SortByOrdinalPosition());
// Sort the primary key column by primary key oridinal position.
Collections.sort(this.pkColumns, new SortByPkOrdinalPosition());
Collections.sort(this.nonPkColumns, new SortByOrdinalPosition());
this.tableName = tableName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,22 @@ public Map<String, TrackedSpannerTable> getSpannerTableByName() {
private Map<String, TrackedSpannerTable> getSpannerTableByName(
Set<String> spannerTableNames,
Map<String, Set<String>> spannerColumnNamesExplicitlyTrackedByChangeStreamByTableName) {
Map<String, Set<String>> keyColumnNameByTableName =
getKeyColumnNameByTableName(spannerTableNames);
Map<String, Map<String, Integer>> keyColumnNameToOrdinalPositionByTableName =
getKeyColumnNameToOrdinalPositionByTableName(spannerTableNames);
Map<String, List<TrackedSpannerColumn>> spannerColumnsByTableName =
getSpannerColumnsByTableName(
spannerTableNames,
keyColumnNameByTableName,
keyColumnNameToOrdinalPositionByTableName,
spannerColumnNamesExplicitlyTrackedByChangeStreamByTableName);

Map<String, TrackedSpannerTable> result = new HashMap<>();
for (String tableName : spannerColumnsByTableName.keySet()) {
List<TrackedSpannerColumn> pkColumns = new ArrayList<>();
List<TrackedSpannerColumn> nonPkColumns = new ArrayList<>();
Set<String> keyColumnNames = keyColumnNameByTableName.get(tableName);
Map<String, Integer> keyColumnNameToOrdinalPosition =
keyColumnNameToOrdinalPositionByTableName.get(tableName);
for (TrackedSpannerColumn spannerColumn : spannerColumnsByTableName.get(tableName)) {
if (keyColumnNames.contains(spannerColumn.getName())) {
if (keyColumnNameToOrdinalPosition.containsKey(spannerColumn.getName())) {
pkColumns.add(spannerColumn);
} else {
nonPkColumns.add(spannerColumn);
Expand All @@ -106,7 +107,7 @@ private Map<String, TrackedSpannerTable> getSpannerTableByName(
*/
private Map<String, List<TrackedSpannerColumn>> getSpannerColumnsByTableName(
Set<String> spannerTableNames,
Map<String, Set<String>> keyColumnNameByTableName,
Map<String, Map<String, Integer>> keyColumnNameToOrdinalPositionByTableName,
Map<String, Set<String>> spannerColumnNamesExplicitlyTrackedByChangeStreamByTableName) {
Map<String, List<TrackedSpannerColumn>> result = new HashMap<>();
StringBuilder sqlStringBuilder =
Expand Down Expand Up @@ -135,17 +136,34 @@ private Map<String, List<TrackedSpannerColumn>> getSpannerColumnsByTableName(
&& !spannerColumnNamesExplicitlyTrackedByChangeStreamByTableName
.get(tableName)
.contains(columnName)
&& (!keyColumnNameByTableName.containsKey(tableName)
|| !keyColumnNameByTableName.get(tableName).contains(columnName))) {
&& (!keyColumnNameToOrdinalPositionByTableName.containsKey(tableName)
|| !keyColumnNameToOrdinalPositionByTableName
.get(tableName)
.containsKey(columnName))) {
continue;
}

int ordinalPosition = (int) columnsResultSet.getLong(INFORMATION_SCHEMA_ORDINAL_POSITION);
String spannerType = columnsResultSet.getString(INFORMATION_SCHEMA_SPANNER_TYPE);
result.putIfAbsent(tableName, new ArrayList<>());
// Set primary key ordinal position for primary key column.
int pkOrdinalPosition = -1;
if (!keyColumnNameToOrdinalPositionByTableName.containsKey(tableName)) {
throw new IllegalArgumentException(
String.format(
"Cannot find key column for change stream %s and table %s",
changeStreamName, tableName));
}
if (keyColumnNameToOrdinalPositionByTableName.get(tableName).containsKey(columnName)) {
pkOrdinalPosition =
keyColumnNameToOrdinalPositionByTableName.get(tableName).get(columnName);
}
TrackedSpannerColumn spannerColumn =
TrackedSpannerColumn.create(
columnName, informationSchemaTypeToSpannerType(spannerType), ordinalPosition);
columnName,
informationSchemaTypeToSpannerType(spannerType),
ordinalPosition,
pkOrdinalPosition);
result.get(tableName).add(spannerColumn);
}
}
Expand All @@ -160,11 +178,12 @@ private Map<String, List<TrackedSpannerColumn>> getSpannerColumnsByTableName(
* information from {@link Mod} whenever we process it, but it's less efficient, since that will
* require to parse the types and sort them based on the ordinal positions for each {@link Mod}.
*/
private Map<String, Set<String>> getKeyColumnNameByTableName(Set<String> spannerTableNames) {
Map<String, Set<String>> result = new HashMap<>();
private Map<String, Map<String, Integer>> getKeyColumnNameToOrdinalPositionByTableName(
Set<String> spannerTableNames) {
Map<String, Map<String, Integer>> result = new HashMap<>();
StringBuilder sqlStringBuilder =
new StringBuilder(
"SELECT TABLE_NAME, COLUMN_NAME, CONSTRAINT_NAME FROM"
"SELECT TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, CONSTRAINT_NAME FROM"
+ " INFORMATION_SCHEMA.KEY_COLUMN_USAGE");

// Skip the tables that are not tracked by change stream.
Expand All @@ -182,11 +201,15 @@ private Map<String, Set<String>> getKeyColumnNameByTableName(Set<String> spanner
while (keyColumnsResultSet.next()) {
String tableName = keyColumnsResultSet.getString(INFORMATION_SCHEMA_TABLE_NAME);
String columnName = keyColumnsResultSet.getString(INFORMATION_SCHEMA_COLUMN_NAME);
// Note The ordinal position of primary key in INFORMATION_SCHEMA.KEY_COLUMN_USAGE table
// is different from the ordinal position from INFORMATION_SCHEMA.COLUMNS table.
int ordinalPosition =
(int) keyColumnsResultSet.getLong(INFORMATION_SCHEMA_ORDINAL_POSITION);
String constraintName = keyColumnsResultSet.getString(INFORMATION_SCHEMA_CONSTRAINT_NAME);
// We are only interested in primary key constraint.
if (isPrimaryKey(constraintName)) {
result.putIfAbsent(tableName, new HashSet<>());
result.get(tableName).add(columnName);
result.putIfAbsent(tableName, new HashMap<>());
result.get(tableName).put(columnName, ordinalPosition);
}
}
}
Expand Down
Loading

0 comments on commit 67d0303

Please sign in to comment.