Skip to content

Commit

Permalink
Split BigQuery DatasetService and WriteStreamService (apache#29604)
Browse files Browse the repository at this point in the history
* Split BigQuery DatasetService and WriteStreamService

* Avoid unnecessary STORAGE_WRITE_API client creation and teardown
  when DatasetService is not used for storage write api

* Fix override in test fixture

* Add missing datasetService.close() in DoFn teardown for completeness

* Though currently DatasetServiceImpl.close() is a noop
  • Loading branch information
Abacn authored Dec 6, 2023
1 parent f710795 commit aef2195
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,14 @@ public CrashingBigQueryServices(Integer crashIntervalSeconds) {
}

@Override
public DatasetService getDatasetService(BigQueryOptions options) {
return new CrashingDatasetService(options);
public WriteStreamService getWriteStreamService(BigQueryOptions options) {
return new CrashingWriteStreamService(options);
}

private class CrashingDatasetService extends BigQueryServicesImpl.DatasetServiceImpl {
private class CrashingWriteStreamService extends BigQueryServicesImpl.WriteStreamServiceImpl {
private Instant lastCrash;

public CrashingDatasetService(BigQueryOptions bqOptions) {
public CrashingWriteStreamService(BigQueryOptions bqOptions) {
super(bqOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public AppendClientInfo withNoAppendClient() {
}

public AppendClientInfo withAppendClient(
BigQueryServices.DatasetService datasetService,
BigQueryServices.WriteStreamService writeStreamService,
Supplier<String> getStreamName,
boolean useConnectionPool,
AppendRowsRequest.MissingValueInterpretation missingValueInterpretation)
Expand All @@ -117,7 +117,7 @@ public AppendClientInfo withAppendClient(
return toBuilder()
.setStreamName(streamName)
.setStreamAppendClient(
datasetService.getStreamAppendClient(
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public interface BigQueryServices extends Serializable {
/** Returns a real, mock, or fake {@link DatasetService}. */
DatasetService getDatasetService(BigQueryOptions bqOptions);

/** Returns a real, mock, or fake {@link WriteStreamService}. */
WriteStreamService getWriteStreamService(BigQueryOptions bqOptions);

/** Returns a real, mock, or fake {@link StorageClient}. */
StorageClient getStorageClient(BigQueryOptions bqOptions) throws IOException;

Expand Down Expand Up @@ -112,7 +115,7 @@ JobStatistics dryRunQuery(
}

/** An interface to get, create and delete Cloud BigQuery datasets and tables. */
public interface DatasetService extends AutoCloseable {
interface DatasetService extends AutoCloseable {

// maps the values at
// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView
Expand Down Expand Up @@ -201,7 +204,10 @@ <T> long insertAll(
/** Patch BigQuery {@link Table} description. */
Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
throws IOException, InterruptedException;
}

/** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */
interface WriteStreamService extends AutoCloseable {
/** Create a Write Stream for use with the Storage Write API. */
WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public DatasetService getDatasetService(BigQueryOptions options) {
return new DatasetServiceImpl(options);
}

@Override
public WriteStreamService getWriteStreamService(BigQueryOptions options) {
return new WriteStreamServiceImpl(options);
}

@Override
public StorageClient getStorageClient(BigQueryOptions options) throws IOException {
return new StorageClientImpl(options);
Expand Down Expand Up @@ -563,64 +568,43 @@ public static class DatasetServiceImpl implements DatasetService {

private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
private final @Nullable BigQueryWriteClient newWriteClient;
private final PipelineOptions options;
private final long maxRowsPerBatch;
private final long maxRowBatchSize;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");

private @Nullable BoundedExecutorService executor;
private final BigQueryIOMetadata bqIOMetadata;

@VisibleForTesting
DatasetServiceImpl(
Bigquery client, @Nullable BigQueryWriteClient newWriteClient, PipelineOptions options) {
DatasetServiceImpl(Bigquery client, PipelineOptions options) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
this.errorExtractor = new ApiErrorExtractor();
this.client = client;
this.newWriteClient = newWriteClient;
this.options = options;
this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.bqIOMetadata = BigQueryIOMetadata.create();
this.executor = null;
}

@VisibleForTesting
DatasetServiceImpl(
Bigquery client,
BigQueryWriteClient newWriteClient,
PipelineOptions options,
long maxRowsPerBatch) {
DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
this.errorExtractor = new ApiErrorExtractor();
this.client = client;
this.newWriteClient = newWriteClient;
this.options = options;
this.maxRowsPerBatch = maxRowsPerBatch;
this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.bqIOMetadata = BigQueryIOMetadata.create();
this.executor = null;
}

public DatasetServiceImpl(BigQueryOptions bqOptions) {
this.errorExtractor = new ApiErrorExtractor();
this.client = newBigQueryClient(bqOptions).build();
this.newWriteClient = newBigQueryWriteClient(bqOptions);
this.options = bqOptions;
this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.bqIOMetadata = BigQueryIOMetadata.create();
this.executor = null;
}

Expand Down Expand Up @@ -1344,6 +1328,38 @@ public Table patchTableDescription(
ALWAYS_RETRY);
}

@Override
public void close() throws Exception {
// Nothing to close
}
}

@VisibleForTesting
public static class WriteStreamServiceImpl implements WriteStreamService {
private final BigQueryWriteClient newWriteClient;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
private final BigQueryIOMetadata bqIOMetadata;
private final PipelineOptions options;

@VisibleForTesting
WriteStreamServiceImpl(BigQueryWriteClient newWriteClient, PipelineOptions options) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
this.newWriteClient = newWriteClient;
this.options = options;
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

public WriteStreamServiceImpl(BigQueryOptions bqOptions) {
this.newWriteClient = newBigQueryWriteClient(bqOptions);
this.options = bqOptions;
this.storageWriteMaxInflightRequests = bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes = bqOptions.getStorageWriteMaxInflightBytes();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

@Override
public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -64,27 +64,29 @@ class StorageApiFinalizeWritesDoFn extends DoFn<KV<String, String>, Void> {

private Map<String, Collection<String>> commitStreams;
private final BigQueryServices bqServices;
private transient @Nullable DatasetService datasetService;
private transient @Nullable WriteStreamService writeStreamService;

public StorageApiFinalizeWritesDoFn(BigQueryServices bqServices) {
this.bqServices = bqServices;
this.commitStreams = Maps.newHashMap();
this.datasetService = null;
this.writeStreamService = null;
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
private WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions)
throws IOException {
if (writeStreamService == null) {
writeStreamService =
bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
return writeStreamService;
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
if (writeStreamService != null) {
writeStreamService.close();
writeStreamService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -101,7 +103,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, String>
throws Exception {
String tableId = element.getKey();
String streamId = element.getValue();
DatasetService datasetService = getDatasetService(pipelineOptions);
WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);

RetryManager<FinalizeWriteStreamResponse, Context<FinalizeWriteStreamResponse>> retryManager =
new RetryManager<>(
Expand All @@ -113,7 +115,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, String>
retryManager.addOperation(
c -> {
finalizeOperationsSent.inc();
return datasetService.finalizeWriteStream(streamId);
return writeStreamService.finalizeWriteStream(streamId);
},
contexts -> {
RetryManager.Operation.Context<FinalizeWriteStreamResponse> firstContext =
Expand Down Expand Up @@ -145,7 +147,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, String>

@FinishBundle
public void finishBundle(PipelineOptions pipelineOptions) throws Exception {
DatasetService datasetService = getDatasetService(pipelineOptions);
WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);
for (Map.Entry<String, Collection<String>> entry : commitStreams.entrySet()) {
final String tableId = entry.getKey();
final Collection<String> streamNames = entry.getValue();
Expand All @@ -161,7 +163,7 @@ public void finishBundle(PipelineOptions pipelineOptions) throws Exception {
Iterable<String> streamsToCommit =
Iterables.filter(streamNames, s -> !alreadyCommittedStreams.contains(s));
batchCommitOperationsSent.inc();
return datasetService.commitWriteStreams(tableId, streamsToCommit);
return writeStreamService.commitWriteStreams(tableId, streamsToCommit);
},
contexts -> {
RetryManager.Operation.Context<BatchCommitWriteStreamsResponse> firstContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.time.Instant;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.WriteStreamService;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
Expand All @@ -50,7 +50,7 @@ public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>,
private static final Logger LOG = LoggerFactory.getLogger(StorageApiFlushAndFinalizeDoFn.class);

private final BigQueryServices bqServices;
private transient @Nullable DatasetService datasetService = null;
private transient @Nullable WriteStreamService writeStreamService = null;
private final Counter flushOperationsSent =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSent");
private final Counter flushOperationsSucceeded =
Expand Down Expand Up @@ -112,19 +112,21 @@ public StorageApiFlushAndFinalizeDoFn(BigQueryServices bqServices) {
this.bqServices = bqServices;
}

private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetService == null) {
datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
private WriteStreamService getWriteStreamService(PipelineOptions pipelineOptions)
throws IOException {
if (writeStreamService == null) {
writeStreamService =
bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetService;
return writeStreamService;
}

@Teardown
public void onTeardown() {
try {
if (datasetService != null) {
datasetService.close();
datasetService = null;
if (writeStreamService != null) {
writeStreamService.close();
writeStreamService = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -136,7 +138,7 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
throws Exception {
final String streamId = element.getKey();
final Operation operation = element.getValue();
final DatasetService datasetService = getDatasetService(pipelineOptions);
final WriteStreamService datasetService = getWriteStreamService(pipelineOptions);
// Flush the stream. If the flush offset < 0, that means we only need to finalize.
long offset = operation.flushOffset;
if (offset >= 0) {
Expand Down
Loading

0 comments on commit aef2195

Please sign in to comment.