Skip to content

Commit

Permalink
ut for reverse replication shadow tables (#1759)
Browse files Browse the repository at this point in the history
* ut for reverse replication shadow tables

* incorporated review comments
  • Loading branch information
aksharauke authored and dhercher committed Aug 1, 2024
1 parent 311f85f commit 045b3a9
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;

/** Helper class to create shadow tables for different source types. */
/** Helper class to create shadow tables in the metadata database. */
public class ShadowTableCreator {

private final SpannerAccessor spannerAccessor;
Expand All @@ -47,6 +47,8 @@ public class ShadowTableCreator {
private final SpannerConfig spannerConfig;
private final SpannerConfig metadataConfig;
private String shadowTablePrefix;
private Ddl informationSchemaOfPrimaryDb;
private Ddl informationSchemaOfMetadataDb;

public ShadowTableCreator(
SpannerConfig spannerConfig,
Expand All @@ -60,20 +62,49 @@ public ShadowTableCreator(
this.spannerConfig = spannerConfig;
this.metadataConfig = metadataConfig;
this.shadowTablePrefix = shadowTablePrefix;
setinformationSchemaOfPrimaryDb();
setInformationSchemaOfMetadataDb();
}

public void createShadowTablesInSpanner() {

private void setinformationSchemaOfPrimaryDb() {
BatchClient batchClient = spannerAccessor.getBatchClient();
BatchReadOnlyTransaction context =
batchClient.batchReadOnlyTransaction(TimestampBound.strong());
InformationSchemaScanner scanner = new InformationSchemaScanner(context, dialect);
Ddl informationSchema = scanner.scan();
List<String> dataTablesWithoutShadowTables = getDataTablesWithNoShadowTables(informationSchema);
this.informationSchemaOfPrimaryDb = scanner.scan();
}

private void setInformationSchemaOfMetadataDb() {
BatchClient batchClient = metadataSpannerAccessor.getBatchClient();
BatchReadOnlyTransaction context =
batchClient.batchReadOnlyTransaction(TimestampBound.strong());
InformationSchemaScanner scanner = new InformationSchemaScanner(context, dialect);
this.informationSchemaOfMetadataDb = scanner.scan();
}

// for unit testing purposes
public ShadowTableCreator(
Dialect dialect,
String shadowTablePrefix,
Ddl informationSchemaOfPrimaryDb,
Ddl informationSchemaOfMetadataDb) {
this.dialect = dialect;
this.shadowTablePrefix = shadowTablePrefix;
this.informationSchemaOfPrimaryDb = informationSchemaOfPrimaryDb;
this.informationSchemaOfMetadataDb = informationSchemaOfMetadataDb;
this.spannerAccessor = null;
this.metadataSpannerAccessor = null;
this.spannerConfig = null;
this.metadataConfig = null;
}

public void createShadowTablesInSpanner() {

List<String> dataTablesWithoutShadowTables = getDataTablesWithNoShadowTables();

Ddl.Builder shadowTableBuilder = Ddl.builder(dialect);
for (String dataTableName : dataTablesWithoutShadowTables) {
Table shadowTable = constructShadowTable(informationSchema, dataTableName, dialect);
Table shadowTable = constructShadowTable(dataTableName);
shadowTableBuilder.addTable(shadowTable);
}
List<String> createShadowTableStatements = shadowTableBuilder.build().createTableStatements();
Expand Down Expand Up @@ -104,15 +135,15 @@ public void createShadowTablesInSpanner() {
* Note: Shadow tables for interleaved tables are not interleaved to
* their shadow parent table.
*/
Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect dialect) {
Table constructShadowTable(String dataTableName) {

// Create a new shadow table with the given prefix.
Table.Builder shadowTableBuilder = Table.builder(dialect);
String shadowTableName = shadowTablePrefix + dataTableName;
shadowTableBuilder.name(shadowTableName);

// Add key columns from the data table to the shadow table builder.
Table dataTable = informationSchema.table(dataTableName);
Table dataTable = informationSchemaOfPrimaryDb.table(dataTableName);
Set<String> primaryKeyColNames =
dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());
List<Column> primaryKeyCols =
Expand Down Expand Up @@ -146,20 +177,17 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect
/*
* Returns the list of data table names that don't have a corresponding shadow table.
*/
List<String> getDataTablesWithNoShadowTables(Ddl ddl) {
List<String> getDataTablesWithNoShadowTables() {
// Get the list of shadow tables in the information schema based on the prefix.
Set<String> existingShadowTables = getShadowTablesInDdl(ddl);
Set<String> existingShadowTables = getShadowTablesInDdl(informationSchemaOfPrimaryDb);

List<String> allTables =
ddl.allTables().stream().map(t -> t.name()).collect(Collectors.toList());
informationSchemaOfPrimaryDb.allTables().stream()
.map(t -> t.name())
.collect(Collectors.toList());

BatchClient batchClient = metadataSpannerAccessor.getBatchClient();
BatchReadOnlyTransaction context =
batchClient.batchReadOnlyTransaction(TimestampBound.strong());
InformationSchemaScanner scanner = new InformationSchemaScanner(context, dialect);
Ddl metadataDbinformationSchema = scanner.scan();
Set<String> existingShadowTablesInMetadataDb =
getShadowTablesInDdl(metadataDbinformationSchema);
getShadowTablesInDdl(informationSchemaOfMetadataDb);
/*
* Filter out the following from the list of all table names to get the list of
* data tables which do not have corresponding shadow tables:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.utils;

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public final class ShadowTableCreatorTest {

@Test
public void testShadowTableCreated() {
Ddl primaryDbDdl = getPrimaryDbDdl();
Ddl metadataDbDdl = getMetadataDbDdl();
ShadowTableCreator shadowTableCreator =
new ShadowTableCreator(Dialect.GOOGLE_STANDARD_SQL, "shadow_", primaryDbDdl, metadataDbDdl);
List<String> tablesToCreate = shadowTableCreator.getDataTablesWithNoShadowTables();
List<String> expectedTablesToCreate = ImmutableList.of("table1", "table4");
assertThat(tablesToCreate).containsExactlyElementsIn(expectedTablesToCreate);

Table shadowTable = shadowTableCreator.constructShadowTable("table1");
assertThat(shadowTable.name()).isEqualTo("shadow_table1");
assertThat(shadowTable.columns()).hasSize(2);
assertThat(shadowTable.columns().get(0).name()).isEqualTo("id");
assertThat(shadowTable.columns().get(1).name())
.isEqualTo(Constants.PROCESSED_COMMIT_TS_COLUMN_NAME);
assertThat(shadowTable.primaryKeys()).hasSize(1);
assertThat(shadowTable.primaryKeys().get(0).name()).isEqualTo("id");
assertThat(shadowTable.primaryKeys().get(0).order()).isEqualTo(IndexColumn.Order.ASC);

Table shadowTable4 = shadowTableCreator.constructShadowTable("table4");
assertThat(shadowTable4.name()).isEqualTo("shadow_table4");
assertThat(shadowTable4.columns()).hasSize(3);
assertThat(shadowTable4.columns().get(0).name()).isEqualTo("id4");
assertThat(shadowTable4.columns().get(1).name()).isEqualTo("id5");
assertThat(shadowTable4.columns().get(2).name())
.isEqualTo(Constants.PROCESSED_COMMIT_TS_COLUMN_NAME);
assertThat(shadowTable4.primaryKeys()).hasSize(2);
assertThat(shadowTable4.primaryKeys().get(0).name()).isEqualTo("id4");
assertThat(shadowTable4.primaryKeys().get(0).order()).isEqualTo(IndexColumn.Order.ASC);
assertThat(shadowTable4.primaryKeys().get(1).name()).isEqualTo("id5");
assertThat(shadowTable4.primaryKeys().get(1).order()).isEqualTo(IndexColumn.Order.ASC);
}

private Ddl getPrimaryDbDdl() {
Ddl ddl =
Ddl.builder()
.createTable("table1")
.column("id")
.int64()
.endColumn()
.column("update_ts")
.timestamp()
.endColumn()
.primaryKey()
.asc("id")
.end()
.endTable()
.createTable("table2")
.column("id2")
.int64()
.endColumn()
.column("update_ts")
.timestamp()
.endColumn()
.primaryKey()
.asc("id2")
.end()
.endTable()
.createTable("shadow_table3")
.column("id3")
.int64()
.endColumn()
.column("shadow_update_ts")
.timestamp()
.endColumn()
.primaryKey()
.asc("id3")
.end()
.endTable()
.createTable("table4")
.column("id4")
.int64()
.endColumn()
.column("id5")
.int64()
.endColumn()
.column("shadow_update_ts")
.timestamp()
.endColumn()
.primaryKey()
.asc("id4")
.asc("id5")
.end()
.endTable()
.build();
return ddl;
}

private Ddl getMetadataDbDdl() {
Ddl ddl =
Ddl.builder()
.createTable("shadow_table2")
.column("id2")
.int64()
.endColumn()
.column("shadow_update_ts")
.timestamp()
.endColumn()
.primaryKey()
.asc("id2")
.end()
.endTable()
.build();
return ddl;
}
}

0 comments on commit 045b3a9

Please sign in to comment.