diff --git a/CHANGES.md b/CHANGES.md index d082f03fd310e..950abc694488e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)). +* [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)). * Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)). ## Breaking Changes diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 1238271c791ea..2a16bf31a6cba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -2711,9 +2711,14 @@ public Write withFormatFunction(SerializableFunction formatFunct } /** - * If an insert failure occurs, this function is applied to the originally supplied row T. The - * resulting {@link TableRow} will be accessed via {@link - * WriteResult#getFailedInsertsWithErr()}. + * If an insert failure occurs, this function is applied to the originally supplied T element. + * + *

For {@link Method#STREAMING_INSERTS} method, the resulting {@link TableRow} will be + * accessed via {@link WriteResult#getFailedInsertsWithErr()}. + * + *

For {@link Method#STORAGE_WRITE_API} and {@link Method#STORAGE_API_AT_LEAST_ONCE} methods, + * the resulting {@link TableRow} will be accessed via {@link + * WriteResult#getFailedStorageApiInserts()}. */ public Write withFormatRecordOnFailureFunction( SerializableFunction formatFunction) { @@ -3773,6 +3778,7 @@ private WriteResult continueExpandTyped( dynamicDestinations, elementSchema, elementToRowFunction, + getFormatRecordOnFailureFunction(), getRowMutationInformationFn() != null); } else if (getWriteProtosClass() != null && getDirectWriteProtos()) { // We could support both of these by falling back to @@ -3795,7 +3801,9 @@ private WriteResult continueExpandTyped( storageApiDynamicDestinations = (StorageApiDynamicDestinations) new StorageApiDynamicDestinationsProto( - dynamicDestinations, getWriteProtosClass()); + dynamicDestinations, + getWriteProtosClass(), + getFormatRecordOnFailureFunction()); } else if (getAvroRowWriterFactory() != null) { // we can configure the avro to storage write api proto converter for this // assuming the format function returns an Avro GenericRecord @@ -3818,6 +3826,7 @@ private WriteResult continueExpandTyped( dynamicDestinations, avroSchemaFactory, recordWriterFactory.getToAvroFn(), + getFormatRecordOnFailureFunction(), getRowMutationInformationFn() != null); } else { RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = @@ -3827,6 +3836,7 @@ private WriteResult continueExpandTyped( new StorageApiDynamicDestinationsTableRow<>( dynamicDestinations, tableRowWriterFactory.getToRowFn(), + getFormatRecordOnFailureFunction(), getRowMutationInformationFn() != null, getCreateDisposition(), getIgnoreUnknownValues(), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java index b8eeb2522cf2b..e40824eab08b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java @@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.protobuf.ByteString; +import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -42,6 +43,8 @@ abstract static class Value { abstract ProtoRows getProtoRows(); abstract List getTimestamps(); + + abstract List<@Nullable TableRow> getFailsafeTableRows(); } interface ConvertUnknownFields { @@ -96,11 +99,18 @@ public Value next() { } List timestamps = Lists.newArrayList(); + List<@Nullable TableRow> failsafeRows = Lists.newArrayList(); ProtoRows.Builder inserts = ProtoRows.newBuilder(); long bytesSize = 0; while (underlyingIterator.hasNext()) { StorageApiWritePayload payload = underlyingIterator.next(); ByteString byteString = ByteString.copyFrom(payload.getPayload()); + @Nullable TableRow failsafeTableRow = null; + try { + failsafeTableRow = payload.getFailsafeTableRow(); + } catch (IOException e) { + // Do nothing, table row will be generated later from row bytes + } if (autoUpdateSchema) { try { @Nullable TableRow unknownFields = payload.getUnknownFields(); @@ -116,7 +126,10 @@ public Value next() { // This generally implies that ignoreUnknownValues=false and there were still // unknown values here. // Reconstitute the TableRow and send it to the failed-rows consumer. - TableRow tableRow = protoToTableRow.apply(byteString); + TableRow tableRow = + failsafeTableRow != null + ? failsafeTableRow + : protoToTableRow.apply(byteString); // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we // only execute this // codepath when ignoreUnknownFields==true, so we should never hit this codepath. @@ -142,12 +155,13 @@ public Value next() { timestamp = elementsTimestamp; } timestamps.add(timestamp); + failsafeRows.add(failsafeTableRow); bytesSize += byteString.size(); if (bytesSize > splitSize) { break; } } - return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps); + return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps, failsafeRows); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java index aefdb79c535c8..0c6f82b9df813 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -179,15 +179,17 @@ public void processElement( .withTimestamp(timestamp); o.get(successfulWritesTag).output(KV.of(element.getKey(), payload)); } catch (TableRowToStorageApiProto.SchemaConversionException conversionException) { - TableRow tableRow; + TableRow failsafeTableRow; try { - tableRow = messageConverter.toTableRow(element.getValue()); + failsafeTableRow = messageConverter.toFailsafeTableRow(element.getValue()); } catch (Exception e) { badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow"); return; } o.get(failedWritesTag) - .output(new BigQueryStorageApiInsertError(tableRow, conversionException.toString())); + .output( + new BigQueryStorageApiInsertError( + failsafeTableRow, conversionException.toString())); } catch (Exception e) { badRecordRouter.route( o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java index 8ec4d52e3b90f..87667ef2cb171 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java @@ -34,7 +34,7 @@ public interface MessageConverter { StorageApiWritePayload toMessage( T element, @Nullable RowMutationInformation rowMutationInformation) throws Exception; - TableRow toTableRow(T element); + TableRow toFailsafeTableRow(T element); } StorageApiDynamicDestinations(DynamicDestinations inner) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java index 70ecb06d5b8d7..fd5fe27f0c7c6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java @@ -35,6 +35,7 @@ class StorageApiDynamicDestinationsBeamRow { private final TableSchema tableSchema; private final SerializableFunction toRow; + private final @Nullable SerializableFunction formatRecordOnFailureFunction; private final boolean usesCdc; @@ -42,10 +43,12 @@ class StorageApiDynamicDestinationsBeamRow inner, Schema schema, SerializableFunction toRow, + @Nullable SerializableFunction formatRecordOnFailureFunction, boolean usesCdc) { super(inner); this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema); this.toRow = toRow; + this.formatRecordOnFailureFunction = formatRecordOnFailureFunction; this.usesCdc = usesCdc; } @@ -96,12 +99,19 @@ public StorageApiWritePayload toMessage( Message msg = BeamRowToStorageApiProto.messageFromBeamRow( descriptorToUse, toRow.apply(element), changeType, changeSequenceNum); - return StorageApiWritePayload.of(msg.toByteArray(), null); + return StorageApiWritePayload.of( + msg.toByteArray(), + null, + formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null); } @Override - public TableRow toTableRow(T element) { - return BigQueryUtils.toTableRow(toRow.apply(element)); + public TableRow toFailsafeTableRow(T element) { + if (formatRecordOnFailureFunction != null) { + return formatRecordOnFailureFunction.apply(element); + } else { + return BigQueryUtils.toTableRow(toRow.apply(element)); + } } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index c96bb4ce75236..a387495863a26 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -36,16 +36,21 @@ class StorageApiDynamicDestinationsGenericRecord, GenericRecord> toGenericRecord; private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory; + private final @javax.annotation.Nullable SerializableFunction + formatRecordOnFailureFunction; + private boolean usesCdc; StorageApiDynamicDestinationsGenericRecord( DynamicDestinations inner, SerializableFunction<@Nullable TableSchema, Schema> schemaFactory, SerializableFunction, GenericRecord> toGenericRecord, + @Nullable SerializableFunction formatRecordOnFailureFunction, boolean usesCdc) { super(inner); this.toGenericRecord = toGenericRecord; this.schemaFactory = schemaFactory; + this.formatRecordOnFailureFunction = formatRecordOnFailureFunction; this.usesCdc = usesCdc; } @@ -96,13 +101,20 @@ public StorageApiWritePayload toMessage( toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), changeType, changeSequenceNum); - return StorageApiWritePayload.of(msg.toByteArray(), null); + return StorageApiWritePayload.of( + msg.toByteArray(), + null, + formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null); } @Override - public TableRow toTableRow(T element) { - return BigQueryUtils.convertGenericRecordToTableRow( - toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema); + public TableRow toFailsafeTableRow(T element) { + if (formatRecordOnFailureFunction != null) { + return formatRecordOnFailureFunction.apply(element); + } else { + return BigQueryUtils.convertGenericRecordToTableRow( + toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema); + } } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java index 57dbdc9d1e770..d7359f99b96df 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java @@ -22,23 +22,29 @@ import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.lang.reflect.InvocationTargetException; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; import org.checkerframework.checker.nullness.qual.NonNull; /** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */ class StorageApiDynamicDestinationsProto extends StorageApiDynamicDestinations { - DescriptorProtos.DescriptorProto descriptorProto; + private final DescriptorProtos.DescriptorProto descriptorProto; + private final @Nullable SerializableFunction formatRecordOnFailureFunction; @SuppressWarnings({"unchecked", "nullness"}) StorageApiDynamicDestinationsProto( - DynamicDestinations inner, Class protoClass) { + DynamicDestinations inner, + Class protoClass, + @Nullable SerializableFunction formatRecordOnFailureFunction) { super(inner); try { + this.formatRecordOnFailureFunction = formatRecordOnFailureFunction; this.descriptorProto = fixNestedTypes( (Descriptors.Descriptor) @@ -84,12 +90,27 @@ public StorageApiWritePayload toMessage( // we can forward // the through directly. This means that we don't currently support ignoreUnknownValues or // autoUpdateSchema. - return StorageApiWritePayload.of(element.toByteArray(), null); + return StorageApiWritePayload.of( + element.toByteArray(), + null, + formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null); } @Override - public TableRow toTableRow(T element) { - throw new RuntimeException("Not implemented!"); + public TableRow toFailsafeTableRow(T element) { + if (formatRecordOnFailureFunction != null) { + return formatRecordOnFailureFunction.apply(element); + } else { + try { + return TableRowToStorageApiProto.tableRowFromMessage( + DynamicMessage.parseFrom( + TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto), + element.toByteArray()), + true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } }; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index 264dac34473ed..08588cfc78500 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -36,6 +36,7 @@ public class StorageApiDynamicDestinationsTableRow extends StorageApiDynamicDestinations { private final SerializableFunction formatFunction; + private final @Nullable SerializableFunction formatRecordOnFailureFunction; private final boolean usesCdc; private final CreateDisposition createDisposition; @@ -51,12 +52,14 @@ public class StorageApiDynamicDestinationsTableRow inner, SerializableFunction formatFunction, + @Nullable SerializableFunction formatRecordOnFailureFunction, boolean usesCdc, CreateDisposition createDisposition, boolean ignoreUnknownValues, boolean autoSchemaUpdates) { super(inner); this.formatFunction = formatFunction; + this.formatRecordOnFailureFunction = formatRecordOnFailureFunction; this.usesCdc = usesCdc; this.createDisposition = createDisposition; this.ignoreUnknownValues = ignoreUnknownValues; @@ -151,8 +154,12 @@ public DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) } @Override - public TableRow toTableRow(T element) { - return formatFunction.apply(element); + public TableRow toFailsafeTableRow(T element) { + if (formatRecordOnFailureFunction != null) { + return formatRecordOnFailureFunction.apply(element); + } else { + return formatFunction.apply(element); + } } @Override @@ -183,7 +190,10 @@ public StorageApiWritePayload toMessage( unknownFields, changeType, changeSequenceNum); - return StorageApiWritePayload.of(msg.toByteArray(), unknownFields); + return StorageApiWritePayload.of( + msg.toByteArray(), + unknownFields, + formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java index 5b6f27949870b..f0fce11b2d32b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java @@ -39,12 +39,17 @@ public abstract class StorageApiWritePayload { public abstract @Nullable Instant getTimestamp(); + @SuppressWarnings("mutable") + public abstract @Nullable byte[] getFailsafeTableRowPayload(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setPayload(byte[] value); public abstract Builder setUnknownFieldsPayload(@Nullable byte[] value); + public abstract Builder setFailsafeTableRowPayload(@Nullable byte[] value); + public abstract Builder setTimestamp(@Nullable Instant value); public abstract StorageApiWritePayload build(); @@ -53,15 +58,22 @@ public abstract static class Builder { public abstract Builder toBuilder(); @SuppressWarnings("nullness") - static StorageApiWritePayload of(byte[] payload, @Nullable TableRow unknownFields) + static StorageApiWritePayload of( + byte[] payload, @Nullable TableRow unknownFields, @Nullable TableRow failsafeTableRow) throws IOException { @Nullable byte[] unknownFieldsPayload = null; if (unknownFields != null) { unknownFieldsPayload = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), unknownFields); } + @Nullable byte[] failsafeTableRowPayload = null; + if (failsafeTableRow != null) { + failsafeTableRowPayload = + CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), failsafeTableRow); + } return new AutoValue_StorageApiWritePayload.Builder() .setPayload(payload) .setUnknownFieldsPayload(unknownFieldsPayload) + .setFailsafeTableRowPayload(failsafeTableRowPayload) .setTimestamp(null) .build(); } @@ -77,4 +89,12 @@ public StorageApiWritePayload withTimestamp(Instant instant) { } return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), fields); } + + public @Memoized @Nullable TableRow getFailsafeTableRow() throws IOException { + @Nullable byte[] failsafeTableRowPayload = getFailsafeTableRowPayload(); + if (failsafeTableRowPayload == null) { + return null; + } + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), failsafeTableRowPayload); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8a902ec6d264e..369bb2d78634c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.core.ApiFuture; @@ -257,15 +258,20 @@ static class AppendRowsContext extends RetryManager.Operation.Context timestamps; + List<@Nullable TableRow> failsafeTableRows; int failureCount; public AppendRowsContext( - long offset, ProtoRows protoRows, List timestamps) { + long offset, + ProtoRows protoRows, + List timestamps, + List<@Nullable TableRow> failsafeTableRows) { this.offset = offset; this.protoRows = protoRows; this.timestamps = timestamps; this.failureCount = 0; + this.failsafeTableRows = failsafeTableRows; } } @@ -278,6 +284,7 @@ class DestinationState { private long currentOffset = 0; private List pendingMessages; private List pendingTimestamps; + private List<@Nullable TableRow> pendingFailsafeTableRows; private transient @Nullable WriteStreamService maybeWriteStreamService; private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); @@ -319,6 +326,7 @@ public DestinationState( this.shortTableUrn = shortTableUrn; this.pendingMessages = Lists.newArrayList(); this.pendingTimestamps = Lists.newArrayList(); + this.pendingFailsafeTableRows = Lists.newArrayList(); this.maybeWriteStreamService = writeStreamService; this.useDefaultStream = useDefaultStream; this.initialTableSchema = messageConverter.getTableSchema(); @@ -553,6 +561,7 @@ void addMessage( throws Exception { maybeTickleCache(); ByteString payloadBytes = ByteString.copyFrom(payload.getPayload()); + @Nullable TableRow failsafeTableRow = payload.getFailsafeTableRow(); if (autoUpdateSchema) { if (appendClientInfo == null) { appendClientInfo = getAppendClientInfo(true, null); @@ -565,7 +574,10 @@ void addMessage( Preconditions.checkStateNotNull(appendClientInfo) .encodeUnknownFields(unknownFields, ignoreUnknownValues)); } catch (TableRowToStorageApiProto.SchemaConversionException e) { - TableRow tableRow = appendClientInfo.toTableRow(payloadBytes); + @Nullable TableRow tableRow = payload.getFailsafeTableRow(); + if (tableRow == null) { + tableRow = checkNotNull(appendClientInfo).toTableRow(payloadBytes); + } // TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we only // execute this // codepath when ignoreUnknownFields==true, so we should never hit this codepath. @@ -583,6 +595,8 @@ void addMessage( } } pendingMessages.add(payloadBytes); + pendingFailsafeTableRows.add(failsafeTableRow); + org.joda.time.Instant timestamp = payload.getTimestamp(); pendingTimestamps.add(timestamp != null ? timestamp : elementTs); } @@ -601,7 +615,9 @@ long flush( pendingMessages.clear(); final ProtoRows inserts = insertsBuilder.build(); List insertTimestamps = pendingTimestamps; + List<@Nullable TableRow> failsafeTableRows = pendingFailsafeTableRows; pendingTimestamps = Lists.newArrayList(); + pendingFailsafeTableRows = Lists.newArrayList(); // Handle the case where the request is too large. if (inserts.getSerializedSize() >= maxRequestSize) { @@ -616,15 +632,18 @@ long flush( maxRequestSize); } for (int i = 0; i < inserts.getSerializedRowsCount(); ++i) { - ByteString rowBytes = inserts.getSerializedRows(i); + @Nullable TableRow failedRow = failsafeTableRows.get(i); + if (failedRow == null) { + ByteString rowBytes = inserts.getSerializedRows(i); + failedRow = + TableRowToStorageApiProto.tableRowFromMessage( + DynamicMessage.parseFrom( + TableRowToStorageApiProto.wrapDescriptorProto( + getAppendClientInfo(true, null).getDescriptor()), + rowBytes), + true); + } org.joda.time.Instant timestamp = insertTimestamps.get(i); - TableRow failedRow = - TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom( - TableRowToStorageApiProto.wrapDescriptorProto( - getAppendClientInfo(true, null).getDescriptor()), - rowBytes), - true); failedRowsReceiver.outputWithTimestamp( new BigQueryStorageApiInsertError( failedRow, "Row payload too large. Maximum size " + maxRequestSize), @@ -647,7 +666,7 @@ long flush( this.currentOffset += inserts.getSerializedRowsCount(); } AppendRowsContext appendRowsContext = - new AppendRowsContext(offset, inserts, insertTimestamps); + new AppendRowsContext(offset, inserts, insertTimestamps, failsafeTableRows); retryManager.addOperation( c -> { @@ -692,18 +711,22 @@ long flush( Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. - ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); BigQueryStorageApiInsertError element = null; + org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); try { - TableRow failedRow = - TableRowToStorageApiProto.tableRowFromMessage( - DynamicMessage.parseFrom( - TableRowToStorageApiProto.wrapDescriptorProto( - Preconditions.checkStateNotNull(appendClientInfo) - .getDescriptor()), - protoBytes), - true); + TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); + if (failedRow == null) { + ByteString protoBytes = + failedContext.protoRows.getSerializedRows(failedIndex); + failedRow = + TableRowToStorageApiProto.tableRowFromMessage( + DynamicMessage.parseFrom( + TableRowToStorageApiProto.wrapDescriptorProto( + Preconditions.checkStateNotNull(appendClientInfo) + .getDescriptor()), + protoBytes), + true); + } element = new BigQueryStorageApiInsertError( failedRow, error.getRowIndexToErrorMessage().get(failedIndex)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index a7da19a75f850..f3f512110b50f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -149,12 +149,17 @@ class AppendRowsContext extends RetryManager.Operation.Context timestamps; + List<@Nullable TableRow> failsafeTableRows; AppendRowsContext( - ShardedKey key, ProtoRows protoRows, List timestamps) { + ShardedKey key, + ProtoRows protoRows, + List timestamps, + List<@Nullable TableRow> failsafeTableRows) { this.key = key; this.protoRows = protoRows; this.timestamps = timestamps; + this.failsafeTableRows = failsafeTableRows; } @Override @@ -685,8 +690,11 @@ public void process( Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { // Convert the message to a TableRow and send it to the failedRows collection. - ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - TableRow failedRow = appendClientInfo.get().toTableRow(protoBytes); + TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); + if (failedRow == null) { + ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); + failedRow = appendClientInfo.get().toTableRow(protoBytes); + } org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); o.get(failedRowsTag) .outputWithTimestamp( @@ -851,9 +859,12 @@ public void process( + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection."); } for (int i = 0; i < splitValue.getProtoRows().getSerializedRowsCount(); ++i) { - ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i); org.joda.time.Instant timestamp = splitValue.getTimestamps().get(i); - TableRow failedRow = appendClientInfo.get().toTableRow(rowBytes); + TableRow failedRow = splitValue.getFailsafeTableRows().get(i); + if (failedRow == null) { + ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i); + failedRow = appendClientInfo.get().toTableRow(rowBytes); + } o.get(failedRowsTag) .outputWithTimestamp( new BigQueryStorageApiInsertError( @@ -872,7 +883,10 @@ public void process( // RetryManager AppendRowsContext context = new AppendRowsContext( - element.getKey(), splitValue.getProtoRows(), splitValue.getTimestamps()); + element.getKey(), + splitValue.getProtoRows(), + splitValue.getTimestamps(), + splitValue.getFailsafeTableRows()); contexts.add(context); retryManager.addOperation(runOperation, onError, onSuccess, context); recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index c5af8045bfe20..2736ed7beb881 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -83,6 +83,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.StreamSupport; import org.apache.avro.Schema.Field; @@ -3065,7 +3066,312 @@ public void testStreamingInsertsExtendedErrorRetrieval() throws Exception { } @Test - public void testStorageApiErrors() throws Exception { + public void testStorageApiErrorsWriteProto() throws Exception { + assumeTrue(useStorageApi); + final Method method = + useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + + final int failFrom = 10; + + Function getPrimitive = + (Integer i) -> + Proto3SchemaMessages.Primitive.newBuilder() + .setPrimitiveDouble(i) + .setPrimitiveFloat(i) + .setPrimitiveInt32(i) + .setPrimitiveInt64(i) + .setPrimitiveUint32(i) + .setPrimitiveUint64(i) + .setPrimitiveSint32(i) + .setPrimitiveSint64(i) + .setPrimitiveFixed32(i) + .setPrimitiveFixed64(i) + .setPrimitiveBool(true) + .setPrimitiveString(Integer.toString(i)) + .setPrimitiveBytes( + ByteString.copyFrom(Integer.toString(i).getBytes(StandardCharsets.UTF_8))) + .build(); + List goodRows = + IntStream.range(1, 20).mapToObj(getPrimitive::apply).collect(Collectors.toList()); + + Function getPrimitiveRow = + (Integer i) -> + new TableRow() + .set("primitive_double", Double.valueOf(i)) + .set("primitive_float", Float.valueOf(i).doubleValue()) + .set("primitive_int32", i.intValue()) + .set("primitive_int64", i.toString()) + .set("primitive_uint32", i.toString()) + .set("primitive_uint64", i.toString()) + .set("primitive_sint32", i.toString()) + .set("primitive_sint64", i.toString()) + .set("primitive_fixed32", i.toString()) + .set("primitive_fixed64", i.toString()) + .set("primitive_bool", true) + .set("primitive_string", i.toString()) + .set( + "primitive_bytes", + BaseEncoding.base64() + .encode( + ByteString.copyFrom(i.toString().getBytes(StandardCharsets.UTF_8)) + .toByteArray())); + + Function shouldFailRow = + (Function & Serializable) + tr -> + tr.containsKey("primitive_int32") + && (Integer) tr.get("primitive_int32") >= failFrom; + fakeDatasetService.setShouldFailRow(shouldFailRow); + + SerializableFunction formatRecordOnFailureFunction = + input -> { + TableRow failedTableRow = new TableRow().set("testFailureFunctionField", "testValue"); + failedTableRow.set("originalValue", input.getPrimitiveFixed32()); + return failedTableRow; + }; + + WriteResult result = + p.apply(Create.of(goodRows)) + .apply( + BigQueryIO.writeProtos(Proto3SchemaMessages.Primitive.class) + .to("project-id:dataset-id.table") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withMethod(method) + .withoutValidation() + .withFormatRecordOnFailureFunction(formatRecordOnFailureFunction) + .withPropagateSuccessfulStorageApiWrites(true) + .withTestServices(fakeBqServices)); + + PCollection deadRows = + result + .getFailedStorageApiInserts() + .apply( + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(BigQueryStorageApiInsertError::getRow)); + + List expectedFailedRows = + goodRows.stream() + .filter(primitive -> primitive.getPrimitiveFixed32() >= failFrom) + .map(formatRecordOnFailureFunction::apply) + .collect(Collectors.toList()); + PAssert.that(deadRows).containsInAnyOrder(expectedFailedRows); + p.run(); + + // Round trip through the coder to make sure the types match our expected types. + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table").stream() + .map( + tr -> { + try { + byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr); + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()), + containsInAnyOrder( + Iterables.toArray( + Iterables.filter( + goodRows.stream() + .map(primitive -> getPrimitiveRow.apply(primitive.getPrimitiveFixed32())) + .collect(Collectors.toList()), + r -> !shouldFailRow.apply(r)), + TableRow.class))); + } + + @Test + public void testStorageApiErrorsWriteBeamRow() throws Exception { + assumeTrue(useStorageApi); + final Method method = + useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + + final int failFrom = 10; + final String shouldFailName = "failme"; + + List goodRows = + Lists.newArrayList( + new SchemaPojo("a", 1), + new SchemaPojo("b", 2), + new SchemaPojo("c", 10), + new SchemaPojo("d", 11), + new SchemaPojo(shouldFailName, 1)); + + String nameField = "name"; + String numberField = "number"; + Function shouldFailRow = + (Function & Serializable) + tr -> + shouldFailName.equals(tr.get(nameField)) + || (Integer.valueOf((String) tr.get(numberField)) >= failFrom); + fakeDatasetService.setShouldFailRow(shouldFailRow); + + SerializableFunction formatRecordOnFailureFunction = + input -> { + TableRow failedTableRow = new TableRow().set("testFailureFunctionField", "testValue"); + failedTableRow.set("originalName", input.name); + failedTableRow.set("originalNumber", input.number); + return failedTableRow; + }; + + WriteResult result = + p.apply(Create.of(goodRows)) + .apply( + BigQueryIO.write() + .to("project-id:dataset-id.table") + .withMethod(method) + .useBeamSchema() + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) + .withPropagateSuccessfulStorageApiWrites(true) + .withTestServices(fakeBqServices) + .withFormatRecordOnFailureFunction(formatRecordOnFailureFunction) + .withoutValidation()); + + PCollection deadRows = + result + .getFailedStorageApiInserts() + .apply( + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(BigQueryStorageApiInsertError::getRow)); + PCollection successfulRows = result.getSuccessfulStorageApiInserts(); + + List expectedFailedRows = + goodRows.stream() + .filter(pojo -> shouldFailName.equals(pojo.name) || pojo.number >= failFrom) + .map(formatRecordOnFailureFunction::apply) + .collect(Collectors.toList()); + PAssert.that(deadRows).containsInAnyOrder(expectedFailedRows); + PAssert.that(successfulRows) + .containsInAnyOrder( + Iterables.toArray( + Iterables.filter( + goodRows.stream() + .map( + pojo -> { + TableRow tableRow = new TableRow(); + tableRow.set(nameField, pojo.name); + tableRow.set(numberField, String.valueOf(pojo.number)); + return tableRow; + }) + .collect(Collectors.toList()), + r -> !shouldFailRow.apply(r)), + TableRow.class)); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table"), + containsInAnyOrder( + Iterables.toArray( + Iterables.filter( + goodRows.stream() + .map( + pojo -> { + TableRow tableRow = new TableRow(); + tableRow.set(nameField, pojo.name); + tableRow.set(numberField, String.valueOf(pojo.number)); + return tableRow; + }) + .collect(Collectors.toList()), + r -> !shouldFailRow.apply(r)), + TableRow.class))); + } + + @Test + public void testStorageApiErrorsWriteGenericRecord() throws Exception { + assumeTrue(useStorageApi); + final Method method = + useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; + + final long failFrom = 10L; + List goodRows = LongStream.range(0, 20).boxed().collect(Collectors.toList()); + + String fieldName = "number"; + Function shouldFailRow = + (Function & Serializable) + tr -> (Long.valueOf((String) tr.get(fieldName))) >= failFrom; + fakeDatasetService.setShouldFailRow(shouldFailRow); + + SerializableFunction formatRecordOnFailureFunction = + input -> { + TableRow failedTableRow = new TableRow().set("testFailureFunctionField", "testValue"); + failedTableRow.set("originalElement", input); + return failedTableRow; + }; + + WriteResult result = + p.apply(Create.of(goodRows)) + .apply( + BigQueryIO.write() + .to("project-id:dataset-id.table") + .withMethod(method) + .withAvroFormatFunction( + (SerializableFunction, GenericRecord>) + input -> + new GenericRecordBuilder(avroSchema) + .set(fieldName, input.getElement()) + .build()) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName(fieldName).setType("INTEGER")))) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) + .withPropagateSuccessfulStorageApiWrites(true) + .withTestServices(fakeBqServices) + .withFormatRecordOnFailureFunction(formatRecordOnFailureFunction) + .withoutValidation()); + + PCollection deadRows = + result + .getFailedStorageApiInserts() + .apply( + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(BigQueryStorageApiInsertError::getRow)); + PCollection successfulRows = result.getSuccessfulStorageApiInserts(); + + List expectedFailedRows = + goodRows.stream() + .filter(l -> l >= failFrom) + .map(formatRecordOnFailureFunction::apply) + .collect(Collectors.toList()); + PAssert.that(deadRows).containsInAnyOrder(expectedFailedRows); + PAssert.that(successfulRows) + .containsInAnyOrder( + Iterables.toArray( + Iterables.filter( + goodRows.stream() + .map( + l -> { + TableRow tableRow = new TableRow(); + tableRow.set(fieldName, String.valueOf(l)); + return tableRow; + }) + .collect(Collectors.toList()), + r -> !shouldFailRow.apply(r)), + TableRow.class)); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table"), + containsInAnyOrder( + Iterables.toArray( + Iterables.filter( + goodRows.stream() + .map( + l -> { + TableRow tableRow = new TableRow(); + tableRow.set(fieldName, String.valueOf(l)); + return tableRow; + }) + .collect(Collectors.toList()), + r -> !shouldFailRow.apply(r)), + TableRow.class))); + } + + @Test + public void testStorageApiErrorsWriteTableRows() throws Exception { assumeTrue(useStorageApi); final Method method = useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API; @@ -3132,6 +3438,22 @@ public void testStorageApiErrors() throws Exception { tr -> tr.containsKey("name") && tr.get("name").equals(failValue); fakeDatasetService.setShouldFailRow(shouldFailRow); + SerializableFunction formatRecordOnFailureFunction = + input -> { + TableRow failedTableRow = new TableRow().set("testFailureFunctionField", "testValue"); + if (input != null) { + Object name = input.get("name"); + if (name != null) { + failedTableRow.set("name", name); + } + Object number = input.get("number"); + if (number != null) { + failedTableRow.set("number", number); + } + } + return failedTableRow; + }; + WriteResult result = p.apply(Create.of(Iterables.concat(goodRows, badRows))) .apply( @@ -3143,6 +3465,7 @@ public void testStorageApiErrors() throws Exception { .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .withPropagateSuccessfulStorageApiWrites(true) .withTestServices(fakeBqServices) + .withFormatRecordOnFailureFunction(formatRecordOnFailureFunction) .withoutValidation()); PCollection deadRows = @@ -3153,9 +3476,14 @@ public void testStorageApiErrors() throws Exception { .via(BigQueryStorageApiInsertError::getRow)); PCollection successfulRows = result.getSuccessfulStorageApiInserts(); - PAssert.that(deadRows) - .containsInAnyOrder( - Iterables.concat(badRows, Iterables.filter(goodRows, shouldFailRow::apply))); + List expectedFailedRows = + badRows.stream().map(formatRecordOnFailureFunction::apply).collect(Collectors.toList()); + expectedFailedRows.addAll( + goodRows.stream() + .filter(shouldFailRow::apply) + .map(formatRecordOnFailureFunction::apply) + .collect(Collectors.toList())); + PAssert.that(deadRows).containsInAnyOrder(expectedFailedRows); PAssert.that(successfulRows) .containsInAnyOrder( Iterables.toArray(