Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified BigQueryDynamicDestination destination to support write API with exactly once semantics #1885

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,37 @@
*/
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +55,7 @@
* destination table is inferred from the provided {@link TableRow}.
*/
public final class BigQueryDynamicDestinations
extends DynamicDestinations<TableRow, KV<TableId, TableRow>> {
extends DynamicDestinations<TableRow, KV<String, List<TableFieldSchema>>> {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryDynamicDestinations.class);

Expand All @@ -68,35 +78,34 @@
this.useStorageWriteApi = bigQueryDynamicDestinationsOptions.getUseStorageWriteApi();
}

private TableId getTableId(String bigQueryTableTemplate, TableRow tableRow) {
private String getTableName(TableRow tableRow) {
String bigQueryTableName =
BigQueryConverters.formatStringTemplate(bigQueryTableTemplate, tableRow);

return TableId.of(bigQueryProject, bigQueryDataset, bigQueryTableName);
return String.format("%s:%s.%s", bigQueryProject, bigQueryDataset, bigQueryTableName);

Check warning on line 85 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L85

Added line #L85 was not covered by tests
}

@Override
public KV<TableId, TableRow> getDestination(ValueInSingleWindow<TableRow> element) {
TableRow tableRow = element.getValue();
return KV.of(getTableId(bigQueryTableTemplate, tableRow), tableRow);
public Coder<KV<String, List<TableFieldSchema>>> getDestinationCoder() {
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
return KvCoder.of(StringUtf8Coder.of(), ListCoder.of(TableFieldSchemaCoder.of()));

Check warning on line 90 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L90

Added line #L90 was not covered by tests
}

@Override
public TableDestination getTable(KV<TableId, TableRow> destination) {
TableId tableId = getTableId(bigQueryTableTemplate, destination.getValue());
String tableName =
String.format("%s:%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
public KV<String, List<TableFieldSchema>> getDestination(ValueInSingleWindow<TableRow> element) {
TableRow tableRow = element.getValue();

Check warning on line 95 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L95

Added line #L95 was not covered by tests
// Get List<TableFieldSchema> for both user columns and metadata columns.
return KV.of(getTableName(tableRow), getFields(tableRow));

Check warning on line 97 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L97

Added line #L97 was not covered by tests
}

return new TableDestination(tableName, "BigQuery changelog table.");
@Override
public TableDestination getTable(KV<String, List<TableFieldSchema>> destination) {
return new TableDestination(destination.getKey(), "BigQuery changelog table.");

Check warning on line 102 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L102

Added line #L102 was not covered by tests
}

@Override
public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow tableRow = destination.getValue();
// Get List<TableFieldSchema> for both user columns and metadata columns.
List<TableFieldSchema> fields = getFields(tableRow);
public TableSchema getSchema(KV<String, List<TableFieldSchema>> destination) {
List<TableFieldSchema> filteredFields = new ArrayList<>();
for (TableFieldSchema field : fields) {
for (TableFieldSchema field : destination.getValue()) {
if (!ignoreFields.contains(field.getName())) {
filteredFields.add(field);
}
Expand All @@ -107,7 +116,7 @@

// Returns List<TableFieldSchema> for user columns and metadata columns based on the parameter
// TableRow.
private List<TableFieldSchema> getFields(TableRow tableRow) {
List<TableFieldSchema> getFields(TableRow tableRow) {
// Add all user data fields (excluding metadata fields stored in metadataColumns).
List<TableFieldSchema> fields =
SpannerToBigQueryUtils.tableRowColumnsToBigQueryIOFields(tableRow, this.useStorageWriteApi);
Expand Down Expand Up @@ -214,4 +223,58 @@
DatabaseClient databaseClient = SpannerAccessor.getOrCreate(spannerConfig).getDatabaseClient();
return databaseClient.getDialect();
}

/**
* {@link TableFieldSchemaCoder} provides custom coder for TableFieldSchema with deterministic
* serialization. This coder is only used within the context of this file where TableFieldSchema
* objects are created in {@link BigQueryDynamicDestinations#getFields(TableRow)} method
* deterministically.
*/
private static class TableFieldSchemaCoder extends AtomicCoder<TableFieldSchema> {

private static final ObjectMapper OBJECT_MAPPER;
private static final TypeDescriptor<TableFieldSchema> TYPE_DESCRIPTOR;
private static final TableFieldSchemaCoder INSTANCE;

static {
OBJECT_MAPPER =

Check warning on line 240 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L240

Added line #L240 was not covered by tests
new ObjectMapper()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
TYPE_DESCRIPTOR = new TypeDescriptor<TableFieldSchema>() {};
INSTANCE = new TableFieldSchemaCoder();
}

Check warning on line 246 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L242-L246

Added lines #L242 - L246 were not covered by tests

private static TableFieldSchemaCoder of() {
return INSTANCE;

Check warning on line 249 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L249

Added line #L249 was not covered by tests
}

private TableFieldSchemaCoder() {}

@Override
public void encode(TableFieldSchema value, OutputStream outStream) throws IOException {
String strValue = OBJECT_MAPPER.writeValueAsString(value);
StringUtf8Coder.of().encode(strValue, outStream);
}

Check warning on line 258 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L256-L258

Added lines #L256 - L258 were not covered by tests

@Override
public TableFieldSchema decode(InputStream inStream) throws IOException {
String strValue = StringUtf8Coder.of().decode(inStream);
return OBJECT_MAPPER.readValue(strValue, TableFieldSchema.class);

Check warning on line 263 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L262-L263

Added lines #L262 - L263 were not covered by tests
}

@Override
public long getEncodedElementByteSize(TableFieldSchema value) throws Exception {
String strValue = OBJECT_MAPPER.writeValueAsString(value);
return StringUtf8Coder.of().getEncodedElementByteSize(strValue);

Check warning on line 269 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L268-L269

Added lines #L268 - L269 were not covered by tests
}

@Override
public TypeDescriptor<TableFieldSchema> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;

Check warning on line 274 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L274

Added line #L274 was not covered by tests
}

@Override
public void verifyDeterministic() throws Coder.NonDeterministicException {}

Check warning on line 278 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L278

Added line #L278 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@
import static com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.TestUtils.dropSpannerDatabase;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.teleport.v2.spanner.IntegrationTest;
import com.google.cloud.teleport.v2.spanner.SpannerServerResource;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
Expand All @@ -99,7 +100,7 @@ public final class BigQueryDynamicDestinationsTest {

private static BigQueryDynamicDestinations bigQueryDynamicDestinations;
private static TableRow tableRow;
private static KV<TableId, TableRow> tableIdToTableRow;
private static KV<String, List<TableFieldSchema>> tableNameToFields;
private static String spannerDatabaseName;

private static final String typePrefix = "_type_";
Expand All @@ -125,10 +126,12 @@ public static void before() throws Exception {

tableRow = new TableRow();
tableRow.set(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME, TEST_SPANNER_TABLE);
tableIdToTableRow =
tableNameToFields =
KV.of(
TableId.of(TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog"),
tableRow);
String.format(
"%s:%s.%s",
TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog"),
bigQueryDynamicDestinations.getFields(tableRow));
}

@AfterClass
Expand Down Expand Up @@ -202,17 +205,20 @@ public void testGetDestination() {
PaneInfo paneInfo = PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME);
ValueInSingleWindow<TableRow> tableRowValueInSingleWindow =
ValueInSingleWindow.of(tableRow, timestamp, GlobalWindow.INSTANCE, paneInfo);
TableRow expectedTableRow = new TableRow();
TableId expectedTableId =
TableId.of(TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog");
expectedTableRow.set(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME, TEST_SPANNER_TABLE);
TableRow tableRow = new TableRow();
tableRow.set(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME, TEST_SPANNER_TABLE);
assertThat(bigQueryDynamicDestinations.getDestination(tableRowValueInSingleWindow))
.isEqualTo(KV.of(expectedTableId, expectedTableRow));
.isEqualTo(
KV.of(
String.format(
"%s:%s.%s",
TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog"),
bigQueryDynamicDestinations.getFields(tableRow)));
}

@Test
public void testGetTable() {
assertThat(bigQueryDynamicDestinations.getTable(tableIdToTableRow).toString())
assertThat(bigQueryDynamicDestinations.getTable(tableNameToFields).toString())
.isEqualTo(
"tableSpec: span-cloud-testing:dataset.AllTypes_changelog tableDescription: BigQuery"
+ " changelog table.");
Expand All @@ -223,14 +229,18 @@ public void testGetTable() {
@Test
public void testGetSchema() {
fillTableRow();
tableIdToTableRow =
tableNameToFields =
KV.of(
TableId.of(TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog"),
tableRow);
String schemaStr = bigQueryDynamicDestinations.getSchema(tableIdToTableRow).toString();
String.format(
"%s:%s.%s",
TEST_PROJECT, TEST_BIG_QUERY_DATESET, TEST_SPANNER_TABLE + "_changelog"),
bigQueryDynamicDestinations.getFields(tableRow));
String schemaStr = bigQueryDynamicDestinations.getSchema(tableNameToFields).toString();
schemaStr =
schemaStr.replace(
"classInfo=[categories, collation, defaultValueExpression, description, fields, maxLength, mode, name, policyTags, precision, rangeElementType, roundingMode, scale, type], ",
"classInfo=[categories, collation, defaultValueExpression, description, fields,"
+ " maxLength, mode, name, policyTags, precision, rangeElementType, roundingMode,"
+ " scale, type], ",
"");
schemaStr = schemaStr.replace("GenericData", "");
assertThat(schemaStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,83 @@ public void testSpannerChangeStreamsToBigQueryBasic() throws IOException {
}
}

@Test
public void testSpannerChangeStreamsToBigQueryBasicWriteApiExactlyOnce() throws IOException {
String spannerTable = testName + RandomStringUtils.randomAlphanumeric(1, 5);
String createTableStatement =
String.format(
"CREATE TABLE %s (\n"
+ " Id INT64 NOT NULL,\n"
+ " FirstName String(1024),\n"
+ " LastName String(1024),\n"
+ ") PRIMARY KEY(Id)",
spannerTable);

String cdcTable = spannerTable + "_changelog";
spannerResourceManager.executeDdlStatement(createTableStatement);

int key = nextValue();
String firstName = UUID.randomUUID().toString();
String lastName = UUID.randomUUID().toString();
Mutation insertOneRow =
Mutation.newInsertBuilder(spannerTable)
.set("Id")
.to(key)
.set("FirstName")
.to(firstName)
.set("LastName")
.to(lastName)
.build();
spannerResourceManager.write(Collections.singletonList(insertOneRow));

String createChangeStreamStatement =
String.format("CREATE CHANGE STREAM %s_stream FOR %s", testName, spannerTable);
spannerResourceManager.executeDdlStatement(createChangeStreamStatement);
bigQueryResourceManager.createDataset(REGION);

Function<LaunchConfig.Builder, LaunchConfig.Builder> paramsAdder = Function.identity();

launchInfo =
launchTemplate(
paramsAdder.apply(
LaunchConfig.builder(testName, specPath)
.addParameter("spannerProjectId", PROJECT)
.addParameter("spannerInstanceId", spannerResourceManager.getInstanceId())
.addParameter("spannerDatabase", spannerResourceManager.getDatabaseId())
.addParameter(
"spannerMetadataInstanceId", spannerResourceManager.getInstanceId())
.addParameter("spannerMetadataDatabase", spannerResourceManager.getDatabaseId())
.addParameter("spannerChangeStreamName", testName + "_stream")
.addParameter("bigQueryDataset", bigQueryResourceManager.getDatasetId())
.addParameter("rpcPriority", "HIGH")
.addParameter("useStorageWriteApi", "true")
.addParameter("useStorageWriteApiAtLeastOnce", "false")
.addParameter("numStorageWriteApiStreams", "1")
.addParameter("storageWriteApiTriggeringFrequencySec", "10")
.addParameter("dlqRetryMinutes", "3")));

assertThatPipeline(launchInfo).isRunning();

String updatedLastName = UUID.randomUUID().toString();
Mutation updateOneRow =
Mutation.newUpdateBuilder(spannerTable)
.set("Id")
.to(key)
.set("LastName")
.to(updatedLastName)
.build();
spannerResourceManager.write(Collections.singletonList(updateOneRow));
String query = queryCdcTable(cdcTable, key);
waitForQueryToReturnRows(query, 1, true);

TableResult tableResult = bigQueryResourceManager.runQuery(query);
assertEquals(1, tableResult.getTotalRows());
for (FieldValueList row : tableResult.iterateAll()) {
assertEquals(firstName, row.get("FirstName").getStringValue());
assertEquals(updatedLastName, row.get("LastName").getStringValue());
}
}

@Test
public void testSpannerChangeStreamsToBigQueryFloatColumns() throws IOException {
String spannerTable = testName + RandomStringUtils.randomAlphanumeric(1, 5);
Expand Down
Loading