Skip to content

Commit

Permalink
Automated rollback of changelist 629547859
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 632585342
  • Loading branch information
cloud-teleport committed May 10, 2024
1 parent 65dd988 commit ee6f318
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 1,178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerTable;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerChangeStreamsUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
Expand All @@ -44,17 +47,32 @@
*/
public final class BigQueryDynamicDestinations
extends DynamicDestinations<TableRow, KV<TableId, TableRow>> {

private final Map<String, TrackedSpannerTable> spannerTableByName;
private final String bigQueryProject, bigQueryDataset, bigQueryTableTemplate;
private final Boolean useStorageWriteApi;
private final ImmutableSet<String> ignoreFields;

public static BigQueryDynamicDestinations of(
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions) {
return new BigQueryDynamicDestinations(bigQueryDynamicDestinationsOptions);
Dialect dialect = getDialect(bigQueryDynamicDestinationsOptions.getSpannerConfig());
try (SpannerAccessor spannerAccessor =
SpannerAccessor.getOrCreate(bigQueryDynamicDestinationsOptions.getSpannerConfig())) {
Map<String, TrackedSpannerTable> spannerTableByName =
new SpannerChangeStreamsUtils(
spannerAccessor.getDatabaseClient(),
bigQueryDynamicDestinationsOptions.getChangeStreamName(),
dialect)
.getSpannerTableByName();
return new BigQueryDynamicDestinations(
bigQueryDynamicDestinationsOptions, spannerTableByName);
}
}

private BigQueryDynamicDestinations(
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions) {
BigQueryDynamicDestinationsOptions bigQueryDynamicDestinationsOptions,
Map<String, TrackedSpannerTable> spannerTableByName) {
this.spannerTableByName = spannerTableByName;
this.ignoreFields = bigQueryDynamicDestinationsOptions.getIgnoreFields();
this.bigQueryProject = bigQueryDynamicDestinationsOptions.getBigQueryProject();
this.bigQueryDataset = bigQueryDynamicDestinationsOptions.getBigQueryDataset();
Expand Down Expand Up @@ -87,8 +105,10 @@ public TableDestination getTable(KV<TableId, TableRow> destination) {
@Override
public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow tableRow = destination.getValue();
// Get List<TableFieldSchema> for both user columns and metadata columns.
List<TableFieldSchema> fields = getFields(tableRow);
String spannerTableName =
(String) tableRow.get(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME);
TrackedSpannerTable spannerTable = spannerTableByName.get(spannerTableName);
List<TableFieldSchema> fields = getFields(spannerTable);
List<TableFieldSchema> filteredFields = new ArrayList<>();
for (TableFieldSchema field : fields) {
if (!ignoreFields.contains(field.getName())) {
Expand All @@ -99,12 +119,9 @@ public TableSchema getSchema(KV<TableId, TableRow> destination) {
return new TableSchema().setFields(filteredFields);
}

// Returns List<TableFieldSchema> for user columns and metadata columns based on the parameter
// TableRow.
private List<TableFieldSchema> getFields(TableRow tableRow) {
// Add all user data fields (excluding metadata fields stored in metadataColumns).
private List<TableFieldSchema> getFields(TrackedSpannerTable spannerTable) {
List<TableFieldSchema> fields =
SpannerToBigQueryUtils.tableRowColumnsToBigQueryIOFields(tableRow, this.useStorageWriteApi);
SpannerToBigQueryUtils.spannerColumnsToBigQueryIOFields(spannerTable.getAllColumns());

// Add all metadata fields.
String requiredMode = Field.Mode.REQUIRED.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerColumn;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.TrackedSpannerTable;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SchemaUpdateUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerChangeStreamsUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
Expand Down Expand Up @@ -133,7 +132,6 @@ public static class FailsafeModJsonToTableRowFn
private transient CallContextConfigurator callContextConfigurator;
private transient boolean seenException;
private Boolean useStorageWriteApi;
private Dialect dialect;

public FailsafeModJsonToTableRowFn(
SpannerConfig spannerConfig,
Expand All @@ -148,7 +146,6 @@ public FailsafeModJsonToTableRowFn(
this.transformDeadLetterOut = transformDeadLetterOut;
this.ignoreFields = ignoreFields;
this.useStorageWriteApi = useStorageWriteApi;
this.dialect = getDialect(spannerConfig);
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Expand All @@ -170,6 +167,7 @@ public <ReqT, RespT> ApiCallContext configure(

@Setup
public void setUp() {
Dialect dialect = getDialect(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
spannerTableByName =
new SpannerChangeStreamsUtils(
Expand Down Expand Up @@ -242,20 +240,11 @@ private TableRow modJsonStringToTableRow(String modJsonString) {
throw new RuntimeException(errorMessage, e);
}
String spannerTableName = mod.getTableName();
TrackedSpannerTable spannerTable =
checkStateNotNull(spannerTableByName.get(spannerTableName));
com.google.cloud.Timestamp spannerCommitTimestamp =
com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
mod.getCommitTimestampSeconds(), mod.getCommitTimestampNanos());
// Detect schema updates (newly added tables/columns) from mod and propagate changes into
// spannerTableByName which stores schema information by table name.
// Not able to get schema update from DELETE mods as they have empty newValuesJson.
if (mod.getModType() != ModType.DELETE) {
spannerTableByName =
SchemaUpdateUtils.updateStoredSchemaIfNeeded(
spannerAccessor, spannerChangeStream, dialect, mod, spannerTableByName);
}

TrackedSpannerTable spannerTable =
checkStateNotNull(spannerTableByName.get(spannerTableName));

// Set metadata fields of the tableRow.
TableRow tableRow = new TableRow();
Expand All @@ -268,8 +257,15 @@ private TableRow modJsonStringToTableRow(String modJsonString) {
useStorageWriteApi);
JSONObject keysJsonObject = new JSONObject(mod.getKeysJson());
// Set Spanner key columns of the tableRow.
SpannerToBigQueryUtils.addSpannerPkColumnsToTableRow(
keysJsonObject, spannerTable.getPkColumns(), tableRow);
for (TrackedSpannerColumn spannerColumn : spannerTable.getPkColumns()) {
String spannerColumnName = spannerColumn.getName();
if (keysJsonObject.has(spannerColumnName)) {
tableRow.set(spannerColumnName, keysJsonObject.get(spannerColumnName));
} else {
throw new IllegalArgumentException(
"Cannot find value for key column " + spannerColumnName);
}
}

// For "DELETE" mod, we only need to set the key columns.
if (mod.getModType() == ModType.DELETE) {
Expand Down Expand Up @@ -357,7 +353,8 @@ private TableRow modJsonStringToTableRow(String modJsonString) {
return tableRow;
}

// Do a Spanner read to retrieve full row. Schema can change while the pipeline is running.
// Do a Spanner read to retrieve full row. The schema change is currently not supported. so we
// assume the schema isn't changed while the pipeline is running,
private void readSpannerRow(
String spannerTableName,
com.google.cloud.spanner.Key key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
Expand All @@ -36,7 +35,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -378,39 +376,19 @@ public void process(
.setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
.setUseStorageWriteApi(options.getUseStorageWriteApi())
.build();
WriteResult writeResult;
if (!options.getUseStorageWriteApi()) {
writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
} else {
writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.ignoreUnknownValues()
.withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
// STORAGE_API_AT_LEAST_ONCE.
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
}
WriteResult writeResult =
tableRowTuple
.get(failsafeModJsonToTableRow.transformOut)
.apply(
"Write To BigQuery",
BigQueryIO.<TableRow>write()
.to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
.withFormatFunction(element -> removeIntermediateMetadataFields(element))
.withFormatRecordOnFailureFunction(element -> element)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

PCollection<String> transformDlqJson =
tableRowTuple
Expand Down Expand Up @@ -467,7 +445,7 @@ private static DeadLetterQueueManager buildDlqManager(
? tempLocation + "dlq/"
: options.getDeadLetterQueueDirectory();

LOG.info("Dead letter queue directory: {}" + dlqDirectory);
LOG.info("Dead letter queue directory: {}", dlqDirectory);
return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
}

Expand All @@ -483,8 +461,6 @@ private static TableRow removeIntermediateMetadataFields(TableRow tableRow) {
for (String rowKey : rowKeys) {
if (metadataFields.contains(rowKey)) {
cleanTableRow.remove(rowKey);
} else if (rowKeys.contains("_type_" + rowKey)) {
cleanTableRow.remove("_type_" + rowKey);
}
}

Expand All @@ -510,7 +486,6 @@ public void process(@Element DataChangeRecord input, OutputReceiver<String> rece
input.isLastRecordInTransactionInPartition(),
input.getRecordSequence(),
input.getTableName(),
input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),
input.getModType(),
input.getValueCaptureType(),
input.getNumberOfRecordsInTransaction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import com.google.cloud.Timestamp;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand All @@ -50,12 +46,10 @@ public final class Mod implements Serializable {
private boolean isLastRecordInTransactionInPartition;
private String recordSequence;
private String tableName;
private List<ModColumnType> rowType;
private ModType modType;
private ValueCaptureType valueCaptureType;
private long numberOfRecordsInTransaction;
private long numberOfPartitionsInTransaction;
private Map<String, ModColumnType> rowTypeAsMap;

/** Default constructor for serialization only. */
private Mod() {}
Expand All @@ -74,7 +68,6 @@ private Mod() {}
* Spanner transaction. The value is unique and monotonically increasing in the context of a
* particular serverTransactionId
* @param tableName the name of the table in which the modifications occurred
* @param rowType the type information of primary keys and modified columns
* @param modType the operation that caused the modification to occur, can only be one of INSERT,
* UPDATE and DELETE
* @param valueCaptureType the value capture type of the change stream
Expand All @@ -96,7 +89,6 @@ public Mod(
boolean isLastRecordInTransactionInPartition,
String recordSequence,
String tableName,
List<ModColumnType> rowType,
ModType modType,
ValueCaptureType valueCaptureType,
long numberOfRecordsInTransaction,
Expand All @@ -109,15 +101,10 @@ public Mod(
this.isLastRecordInTransactionInPartition = isLastRecordInTransactionInPartition;
this.recordSequence = recordSequence;
this.tableName = tableName;
this.rowType = rowType;
this.modType = modType;
this.valueCaptureType = valueCaptureType;
this.numberOfRecordsInTransaction = numberOfRecordsInTransaction;
this.numberOfPartitionsInTransaction = numberOfPartitionsInTransaction;
this.rowTypeAsMap = rowType == null ? Collections.emptyMap() : new HashMap<>();
for (ModColumnType modColumnType : rowType) {
this.rowTypeAsMap.put(modColumnType.getName(), modColumnType);
}
}

public static Mod fromJson(String json) throws IOException {
Expand Down Expand Up @@ -189,16 +176,6 @@ public String getTableName() {
return tableName;
}

/** The type information of primary keys and modified columns. */
public List<ModColumnType> getRowType() {
return rowType;
}

/** The map containing ColumnType information for columns included in the mod. */
public Map<String, ModColumnType> getRowTypeAsMap() {
return rowTypeAsMap;
}

/** The type of operation that caused the modifications within this record. */
public ModType getModType() {
return modType;
Expand Down Expand Up @@ -237,7 +214,6 @@ public boolean equals(@javax.annotation.Nullable Object o) {
&& Objects.equals(serverTransactionId, that.serverTransactionId)
&& Objects.equals(recordSequence, that.recordSequence)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(rowType, that.rowType)
&& modType == that.modType
&& valueCaptureType == that.valueCaptureType;
}
Expand All @@ -252,7 +228,6 @@ public int hashCode() {
isLastRecordInTransactionInPartition,
recordSequence,
tableName,
rowType,
modType,
valueCaptureType,
numberOfRecordsInTransaction,
Expand Down Expand Up @@ -280,8 +255,6 @@ public String toString() {
+ ", tableName='"
+ tableName
+ '\''
+ ", rowType="
+ rowType
+ ", modType="
+ modType
+ ", valueCaptureType="
Expand Down
Loading

0 comments on commit ee6f318

Please sign in to comment.