diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java index 4589f79f1aaa6..214d2745f03e0 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryStreamingLT.java @@ -428,14 +428,14 @@ public CrashingBigQueryServices(Integer crashIntervalSeconds) { } @Override - public DatasetService getDatasetService(BigQueryOptions options) { - return new CrashingDatasetService(options); + public WriteStreamService getWriteStreamService(BigQueryOptions options) { + return new CrashingWriteStreamService(options); } - private class CrashingDatasetService extends BigQueryServicesImpl.DatasetServiceImpl { + private class CrashingWriteStreamService extends BigQueryServicesImpl.WriteStreamServiceImpl { private Instant lastCrash; - public CrashingDatasetService(BigQueryOptions bqOptions) { + public CrashingWriteStreamService(BigQueryOptions bqOptions) { super(bqOptions); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index ee21e1185f37b..3094af5855e95 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -105,7 +105,7 @@ public AppendClientInfo withNoAppendClient() { } public AppendClientInfo withAppendClient( - BigQueryServices.DatasetService datasetService, + BigQueryServices.WriteStreamService writeStreamService, Supplier getStreamName, boolean useConnectionPool, AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) @@ -117,7 +117,7 @@ public AppendClientInfo withAppendClient( return toBuilder() .setStreamName(streamName) .setStreamAppendClient( - datasetService.getStreamAppendClient( + writeStreamService.getStreamAppendClient( streamName, getDescriptor(), useConnectionPool, missingValueInterpretation)) .build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index c9c96eb35f3fd..a8e1adf643ab0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -63,6 +63,9 @@ public interface BigQueryServices extends Serializable { /** Returns a real, mock, or fake {@link DatasetService}. */ DatasetService getDatasetService(BigQueryOptions bqOptions); + /** Returns a real, mock, or fake {@link WriteStreamService}. */ + WriteStreamService getWriteStreamService(BigQueryOptions bqOptions); + /** Returns a real, mock, or fake {@link StorageClient}. */ StorageClient getStorageClient(BigQueryOptions bqOptions) throws IOException; @@ -112,7 +115,7 @@ JobStatistics dryRunQuery( } /** An interface to get, create and delete Cloud BigQuery datasets and tables. */ - public interface DatasetService extends AutoCloseable { + interface DatasetService extends AutoCloseable { // maps the values at // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView @@ -201,7 +204,10 @@ long insertAll( /** Patch BigQuery {@link Table} description. */ Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription) throws IOException, InterruptedException; + } + /** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */ + interface WriteStreamService extends AutoCloseable { /** Create a Write Stream for use with the Storage Write API. */ WriteStream createWriteStream(String tableUrn, WriteStream.Type type) throws IOException, InterruptedException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index fb288fc1d512d..bb3b99f6fcd52 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -197,6 +197,11 @@ public DatasetService getDatasetService(BigQueryOptions options) { return new DatasetServiceImpl(options); } + @Override + public WriteStreamService getWriteStreamService(BigQueryOptions options) { + return new WriteStreamServiceImpl(options); + } + @Override public StorageClient getStorageClient(BigQueryOptions options) throws IOException { return new StorageClientImpl(options); @@ -563,64 +568,43 @@ public static class DatasetServiceImpl implements DatasetService { private final ApiErrorExtractor errorExtractor; private final Bigquery client; - private final @Nullable BigQueryWriteClient newWriteClient; private final PipelineOptions options; private final long maxRowsPerBatch; private final long maxRowBatchSize; - private final long storageWriteMaxInflightRequests; - private final long storageWriteMaxInflightBytes; // aggregate the total time spent in exponential backoff private final Counter throttlingMsecs = Metrics.counter(DatasetServiceImpl.class, "throttling-msecs"); private @Nullable BoundedExecutorService executor; - private final BigQueryIOMetadata bqIOMetadata; @VisibleForTesting - DatasetServiceImpl( - Bigquery client, @Nullable BigQueryWriteClient newWriteClient, PipelineOptions options) { + DatasetServiceImpl(Bigquery client, PipelineOptions options) { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); this.errorExtractor = new ApiErrorExtractor(); this.client = client; - this.newWriteClient = newWriteClient; this.options = options; this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch(); this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); - this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); - this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); - this.bqIOMetadata = BigQueryIOMetadata.create(); this.executor = null; } @VisibleForTesting - DatasetServiceImpl( - Bigquery client, - BigQueryWriteClient newWriteClient, - PipelineOptions options, - long maxRowsPerBatch) { + DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); this.errorExtractor = new ApiErrorExtractor(); this.client = client; - this.newWriteClient = newWriteClient; this.options = options; this.maxRowsPerBatch = maxRowsPerBatch; this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); - this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); - this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); - this.bqIOMetadata = BigQueryIOMetadata.create(); this.executor = null; } public DatasetServiceImpl(BigQueryOptions bqOptions) { this.errorExtractor = new ApiErrorExtractor(); this.client = newBigQueryClient(bqOptions).build(); - this.newWriteClient = newBigQueryWriteClient(bqOptions); this.options = bqOptions; this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch(); this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); - this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); - this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); - this.bqIOMetadata = BigQueryIOMetadata.create(); this.executor = null; } @@ -1344,6 +1328,38 @@ public Table patchTableDescription( ALWAYS_RETRY); } + @Override + public void close() throws Exception { + // Nothing to close + } + } + + @VisibleForTesting + public static class WriteStreamServiceImpl implements WriteStreamService { + private final BigQueryWriteClient newWriteClient; + private final long storageWriteMaxInflightRequests; + private final long storageWriteMaxInflightBytes; + private final BigQueryIOMetadata bqIOMetadata; + private final PipelineOptions options; + + @VisibleForTesting + WriteStreamServiceImpl(BigQueryWriteClient newWriteClient, PipelineOptions options) { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + this.newWriteClient = newWriteClient; + this.options = options; + this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); + this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); + this.bqIOMetadata = BigQueryIOMetadata.create(); + } + + public WriteStreamServiceImpl(BigQueryOptions bqOptions) { + this.newWriteClient = newBigQueryWriteClient(bqOptions); + this.options = bqOptions; + this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests(); + this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes(); + this.bqIOMetadata = BigQueryIOMetadata.create(); + } + @Override public WriteStream createWriteStream(String tableUrn, WriteStream.Type type) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java index 853a63fca8cc8..cbe018ba739c1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java @@ -25,7 +25,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.metrics.Counter; @@ -64,27 +64,29 @@ class StorageApiFinalizeWritesDoFn extends DoFn, Void> { private Map> commitStreams; private final BigQueryServices bqServices; - private transient @Nullable DatasetService datasetService; + private transient @Nullable WriteStreamService writeStreamService; public StorageApiFinalizeWritesDoFn(BigQueryServices bqServices) { this.bqServices = bqServices; this.commitStreams = Maps.newHashMap(); - this.datasetService = null; + this.writeStreamService = null; } - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { - if (datasetService == null) { - datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + private WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions) + throws IOException { + if (writeStreamService == null) { + writeStreamService = + bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class)); } - return datasetService; + return writeStreamService; } @Teardown public void onTeardown() { try { - if (datasetService != null) { - datasetService.close(); - datasetService = null; + if (writeStreamService != null) { + writeStreamService.close(); + writeStreamService = null; } } catch (Exception e) { throw new RuntimeException(e); @@ -101,7 +103,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV throws Exception { String tableId = element.getKey(); String streamId = element.getValue(); - DatasetService datasetService = getDatasetService(pipelineOptions); + WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions); RetryManager> retryManager = new RetryManager<>( @@ -113,7 +115,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV retryManager.addOperation( c -> { finalizeOperationsSent.inc(); - return datasetService.finalizeWriteStream(streamId); + return writeStreamService.finalizeWriteStream(streamId); }, contexts -> { RetryManager.Operation.Context firstContext = @@ -145,7 +147,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV @FinishBundle public void finishBundle(PipelineOptions pipelineOptions) throws Exception { - DatasetService datasetService = getDatasetService(pipelineOptions); + WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions); for (Map.Entry> entry : commitStreams.entrySet()) { final String tableId = entry.getKey(); final Collection streamNames = entry.getValue(); @@ -161,7 +163,7 @@ public void finishBundle(PipelineOptions pipelineOptions) throws Exception { Iterable streamsToCommit = Iterables.filter(streamNames, s -> !alreadyCommittedStreams.contains(s)); batchCommitOperationsSent.inc(); - return datasetService.commitWriteStreams(tableId, streamsToCommit); + return writeStreamService.commitWriteStreams(tableId, streamsToCommit); }, contexts -> { RetryManager.Operation.Context firstContext = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index 333a0c0b36bf6..dec86c3360b07 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -26,7 +26,7 @@ import java.time.Instant; import java.util.Objects; import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation; @@ -50,7 +50,7 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn, private static final Logger LOG = LoggerFactory.getLogger(StorageApiFlushAndFinalizeDoFn.class); private final BigQueryServices bqServices; - private transient @Nullable DatasetService datasetService = null; + private transient @Nullable WriteStreamService writeStreamService = null; private final Counter flushOperationsSent = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSent"); private final Counter flushOperationsSucceeded = @@ -112,19 +112,21 @@ public StorageApiFlushAndFinalizeDoFn(BigQueryServices bqServices) { this.bqServices = bqServices; } - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { - if (datasetService == null) { - datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + private WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions) + throws IOException { + if (writeStreamService == null) { + writeStreamService = + bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class)); } - return datasetService; + return writeStreamService; } @Teardown public void onTeardown() { try { - if (datasetService != null) { - datasetService.close(); - datasetService = null; + if (writeStreamService != null) { + writeStreamService.close(); + writeStreamService = null; } } catch (Exception e) { throw new RuntimeException(e); @@ -136,7 +138,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV= 0) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 13c180a534f16..8afcbd36d732c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter; import org.apache.beam.sdk.metrics.Counter; @@ -271,7 +272,7 @@ class DestinationState { private long currentOffset = 0; private List pendingMessages; private List pendingTimestamps; - private transient @Nullable DatasetService maybeDatasetService; + private transient @Nullable WriteStreamService maybeWriteStreamService; private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); private final Counter appendFailures = @@ -298,7 +299,7 @@ public DestinationState( String tableUrn, String shortTableUrn, MessageConverter messageConverter, - DatasetService datasetService, + WriteStreamService writeStreamService, boolean useDefaultStream, int streamAppendClientCount, boolean usingMultiplexing, @@ -310,7 +311,7 @@ public DestinationState( this.shortTableUrn = shortTableUrn; this.pendingMessages = Lists.newArrayList(); this.pendingTimestamps = Lists.newArrayList(); - this.maybeDatasetService = datasetService; + this.maybeWriteStreamService = writeStreamService; this.useDefaultStream = useDefaultStream; this.initialTableSchema = messageConverter.getTableSchema(); this.initialDescriptor = messageConverter.getDescriptor(includeCdcColumns); @@ -356,7 +357,7 @@ String getOrCreateStreamName() throws Exception { () -> { if (!useDefaultStream) { this.streamName = - Preconditions.checkStateNotNull(maybeDatasetService) + Preconditions.checkStateNotNull(maybeWriteStreamService) .createWriteStream(tableUrn, Type.PENDING) .getName(); this.currentOffset = 0; @@ -397,7 +398,7 @@ AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exce appendClientInfo .get() .withAppendClient( - Preconditions.checkStateNotNull(maybeDatasetService), + Preconditions.checkStateNotNull(maybeWriteStreamService), () -> streamName, usingMultiplexing, defaultMissingValueInterpretation)); @@ -438,7 +439,8 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u if (autoUpdateSchema) { @Nullable WriteStream writeStream = - Preconditions.checkStateNotNull(maybeDatasetService).getWriteStream(streamName); + Preconditions.checkStateNotNull(maybeWriteStreamService) + .getWriteStream(streamName); if (writeStream != null && writeStream.hasTableSchema()) { TableSchema updatedFromStream = writeStream.getTableSchema(); currentSchema.set(updatedFromStream); @@ -870,6 +872,7 @@ void postFlush() { private @Nullable Map destinations = Maps.newHashMap(); private final TwoLevelMessageConverterCache messageConverters; private transient @Nullable DatasetService maybeDatasetService; + private transient @Nullable WriteStreamService maybeWriteStreamService; private int numPendingRecords = 0; private int numPendingRecordBytes = 0; private final int flushThresholdBytes; @@ -977,6 +980,14 @@ private DatasetService initializeDatasetService(PipelineOptions pipelineOptions) return maybeDatasetService; } + private WriteStreamService initializeWriteStreamService(PipelineOptions pipelineOptions) { + if (maybeWriteStreamService == null) { + maybeWriteStreamService = + bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class)); + } + return maybeWriteStreamService; + } + @StartBundle public void startBundle() throws IOException { destinations = Maps.newHashMap(); @@ -989,6 +1000,7 @@ DestinationState createDestinationState( DestinationT destination, boolean useCdc, DatasetService datasetService, + WriteStreamService writeStreamService, BigQueryOptions bigQueryOptions) { TableDestination tableDestination1 = dynamicDestinations.getTable(destination); checkArgument( @@ -1019,7 +1031,7 @@ DestinationState createDestinationState( tableDestination1.getTableUrn(bigQueryOptions), tableDestination1.getShortTableUrn(), messageConverter, - datasetService, + writeStreamService, useDefaultStream, streamAppendClientCount, bigQueryOptions.getUseStorageApiConnectionPool(), @@ -1040,6 +1052,8 @@ public void process( MultiOutputReceiver o) throws Exception { DatasetService initializedDatasetService = initializeDatasetService(pipelineOptions); + WriteStreamService initializedWriteStreamService = + initializeWriteStreamService(pipelineOptions); dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationState state = Preconditions.checkStateNotNull(destinations) @@ -1051,6 +1065,7 @@ public void process( destination, usesCdc, initializedDatasetService, + initializedWriteStreamService, pipelineOptions.as(BigQueryOptions.class))); OutputReceiver failedRowsReceiver = o.get(failedRowsTag); @@ -1116,6 +1131,10 @@ public void outputWithTimestamp(TableRow output, org.joda.time.Instant timestamp public void teardown() { destinations = null; try { + if (maybeWriteStreamService != null) { + maybeWriteStreamService.close(); + maybeWriteStreamService = null; + } if (maybeDatasetService != null) { maybeDatasetService.close(); maybeDatasetService = null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index d3042984638f0..8cf8ad0ee025a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService; import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation; import org.apache.beam.sdk.metrics.Counter; @@ -319,6 +320,8 @@ class WriteRecordsDoFn private transient @Nullable DatasetService datasetServiceInternal = null; + private transient @Nullable WriteStreamService writeStreamServiceInternal = null; + // Stores the current stream for this key. @StateId("streamName") private final StateSpec> streamNameSpec = StateSpecs.value(); @@ -358,7 +361,7 @@ String getOrCreateStream( ValueState streamName, ValueState streamOffset, Timer streamIdleTimer, - DatasetService datasetService, + WriteStreamService writeStreamService, Callable tryCreateTable) { try { final @Nullable String streamValue = streamName.read(); @@ -367,7 +370,7 @@ String getOrCreateStream( // In a buffered stream, data is only visible up to the offset to which it was flushed. CreateTableHelpers.createTableWrapper( () -> { - stream.set(datasetService.createWriteStream(tableId, Type.BUFFERED).getName()); + stream.set(writeStreamService.createWriteStream(tableId, Type.BUFFERED).getName()); return null; }, tryCreateTable); @@ -395,9 +398,22 @@ private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws return datasetServiceInternal; } + private WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions) + throws IOException { + if (writeStreamServiceInternal == null) { + writeStreamServiceInternal = + bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class)); + } + return writeStreamServiceInternal; + } + @Teardown public void onTeardown() { try { + if (writeStreamServiceInternal != null) { + writeStreamServiceInternal.close(); + writeStreamServiceInternal = null; + } if (datasetServiceInternal != null) { datasetServiceInternal.close(); datasetServiceInternal = null; @@ -442,6 +458,7 @@ public void process( final String tableId = tableDestination.getTableUrn(bigQueryOptions); final String shortTableId = tableDestination.getShortTableUrn(); final DatasetService datasetService = getDatasetService(pipelineOptions); + final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions); Coder destinationCoder = dynamicDestinations.getDestinationCoder(); Callable tryCreateTable = @@ -462,7 +479,7 @@ public void process( Supplier getOrCreateStream = () -> getOrCreateStream( - tableId, streamName, streamOffset, idleTimer, datasetService, tryCreateTable); + tableId, streamName, streamOffset, idleTimer, writeStreamService, tryCreateTable); Callable getAppendClientInfo = () -> { @Nullable TableSchema tableSchema; @@ -500,7 +517,7 @@ public void process( client.close(); })) .withAppendClient( - datasetService, + writeStreamService, getOrCreateStream, false, defaultMissingValueInterpretation); @@ -569,7 +586,7 @@ public void process( appendClientInfo .get() .withAppendClient( - datasetService, + writeStreamService, getOrCreateStream, false, defaultMissingValueInterpretation)); @@ -618,7 +635,7 @@ public void process( appendClientInfo .get() .withAppendClient( - datasetService, + writeStreamService, getOrCreateStream, false, defaultMissingValueInterpretation)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java index 5e6e3ac7ed079..c258ce4ab7fb0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java @@ -43,7 +43,7 @@ }) public class FakeBigQueryServices implements BigQueryServices { private JobService jobService; - private DatasetService datasetService; + private FakeDatasetService datasetService; private StorageClient storageClient; public FakeBigQueryServices withJobService(JobService jobService) { @@ -71,6 +71,11 @@ public DatasetService getDatasetService(BigQueryOptions bqOptions) { return datasetService; } + @Override + public WriteStreamService getWriteStreamService(BigQueryOptions bqOptions) { + return datasetService; + } + @Override public StorageClient getStorageClient(BigQueryOptions bqOptions) { return storageClient; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index f26c38d1e3c86..6a50127acd8fc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService; import org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context; @@ -84,7 +85,7 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class FakeDatasetService implements DatasetService, Serializable { +public class FakeDatasetService implements DatasetService, WriteStreamService, Serializable { // Table information must be static, as each ParDo will get a separate instance of // FakeDatasetServices, and they must all modify the same storage. static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table< diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 474aea020f697..a8e1ad52237c7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -473,8 +473,7 @@ public void testGetTableSucceeds() throws Exception { }); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); Table table = datasetService.getTable( @@ -509,7 +508,7 @@ public void testGetTableNullProjectSucceeds() throws Exception { options.setBigQueryProject("projectId"); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl(bigquery, null, options); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, options); Table table = datasetService.getTable( @@ -528,8 +527,7 @@ public void testGetTableNotFound() throws IOException, InterruptedException { }); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); TableReference tableRef = new TableReference() @@ -562,8 +560,7 @@ public void testGetTableThrows() throws Exception { thrown.expectMessage(String.format("Unable to get table: %s", tableRef.getTableId())); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); datasetService.getTable( tableRef, Collections.emptyList(), null, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); } @@ -593,8 +590,7 @@ public void testIsTableEmptySucceeds() throws Exception { }); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); assertFalse(datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT)); @@ -610,8 +606,7 @@ public void testIsTableEmptyNoRetryForNotFound() throws IOException, Interrupted }); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); TableReference tableRef = new TableReference() @@ -644,8 +639,7 @@ public void testIsTableEmptyThrows() throws Exception { .setTableId("tableId"); BigQueryServicesImpl.DatasetServiceImpl datasetService = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); thrown.expect(IOException.class); thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId())); @@ -715,7 +709,7 @@ public void testInsertRateLimitRetry() throws Exception { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll( ref, rows, @@ -760,7 +754,7 @@ public void testInsertQuotaExceededRetry() throws Exception { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll( ref, rows, @@ -817,7 +811,7 @@ public void testInsertStoppedRetry() throws Exception { thrown.expectMessage("quotaExceeded"); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll( ref, rows, @@ -876,7 +870,7 @@ public void testInsertRetrySelectRows() throws Exception { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); dataService.insertAll( ref, rows, @@ -933,7 +927,7 @@ public void testInsertWithinRowCountLimits() throws Exception { PipelineOptionsFactory.fromArgs("--maxStreamingRowsToBatch=1").create(); options.as(GcsOptions.class).setExecutorService(Executors.newSingleThreadExecutor()); - DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, null, options); + DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, options); dataService.insertAll( ref, rows, @@ -983,7 +977,7 @@ public void testInsertWithinRequestByteSizeLimitsErrorsOut() throws Exception { DatasetServiceImpl dataService = new DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create()); + bigquery, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create()); List> failedInserts = Lists.newArrayList(); List> successfulRows = Lists.newArrayList(); RuntimeException e = @@ -1031,7 +1025,7 @@ public void testInsertRetryTransientsAboveRequestByteSizeLimits() throws Excepti DatasetServiceImpl dataService = new DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create()); + bigquery, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create()); List> failedInserts = Lists.newArrayList(); List> successfulRows = Lists.newArrayList(); dataService.insertAll( @@ -1086,7 +1080,7 @@ public void testInsertWithinRequestByteSizeLimits() throws Exception { PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create(); options.as(GcsOptions.class).setExecutorService(Executors.newSingleThreadExecutor()); - DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, null, options); + DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, options); dataService.insertAll( ref, rows, @@ -1150,7 +1144,7 @@ public void testInsertFailsGracefully() throws Exception { row0FailureResponseFunction); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); // Expect it to fail. try { @@ -1209,7 +1203,7 @@ public void testFailInsertOtherRetry() throws Exception { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); thrown.expect(RuntimeException.class); thrown.expectMessage("actually forbidden"); try { @@ -1258,7 +1252,7 @@ public void testInsertTimeoutLog() throws Exception { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); RuntimeException e = assertThrows( RuntimeException.class, @@ -1340,7 +1334,7 @@ public void testInsertRetryPolicy() throws InterruptedException, IOException { }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); List> failedInserts = Lists.newArrayList(); dataService.insertAll( @@ -1387,7 +1381,7 @@ public void testSkipInvalidRowsIgnoreUnknownIgnoreInsertIdsValuesStreaming() setupMockResponses(allRowsSucceededResponseFunction, allRowsSucceededResponseFunction); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); // First, test with all flags disabled dataService.insertAll( @@ -1501,8 +1495,7 @@ public void testCreateTableSucceeds() throws IOException { }); BigQueryServicesImpl.DatasetServiceImpl services = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT); @@ -1535,8 +1528,7 @@ public void testCreateTableDoesNotRetry() throws IOException { thrown.expectMessage("actually forbidden"); BigQueryServicesImpl.DatasetServiceImpl services = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); try { services.tryCreateTable( testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT); @@ -1572,8 +1564,7 @@ public void testCreateTableSucceedsAlreadyExists() throws IOException { }); BigQueryServicesImpl.DatasetServiceImpl services = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 0), Sleeper.DEFAULT); @@ -1604,8 +1595,7 @@ public void testCreateTableRetry() throws IOException { }); BigQueryServicesImpl.DatasetServiceImpl services = - new BigQueryServicesImpl.DatasetServiceImpl( - bigquery, null, PipelineOptionsFactory.create()); + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); Table ret = services.tryCreateTable( testTable, new RetryBoundedBackOff(BackOff.ZERO_BACKOFF, 3), Sleeper.DEFAULT); @@ -1651,7 +1641,7 @@ public void testSimpleErrorRetrieval() throws InterruptedException, IOException }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); List> failedInserts = Lists.newArrayList(); dataService.insertAll( @@ -1711,7 +1701,7 @@ public void testExtendedErrorRetrieval() throws InterruptedException, IOExceptio }); DatasetServiceImpl dataService = - new DatasetServiceImpl(bigquery, null, PipelineOptionsFactory.create()); + new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); List> failedInserts = Lists.newArrayList(); dataService.insertAll( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java index cef93913401e0..c71946a546763 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java @@ -170,7 +170,7 @@ public void testTableGet() throws InterruptedException, IOException { onTableList(dataList); BigQueryServicesImpl.DatasetServiceImpl services = - new BigQueryServicesImpl.DatasetServiceImpl(mockClient, null, options); + new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options); services.getTable( new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table")); @@ -190,7 +190,7 @@ public void testInsertAll() throws Exception { onInsertAll(errorsIndices); TableReference ref = BigQueryHelpers.parseTableSpec("project:dataset.table"); - DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, null, options, 5); + DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5); List> rows = new ArrayList<>(); List ids = new ArrayList<>();