Skip to content

Commit

Permalink
BQ write client for exactly once (#152)
Browse files Browse the repository at this point in the history
Client definition for the BigQuery storage write APIs for connector's
exactly once sink.
  • Loading branch information
jayehwhyehentee authored Sep 17, 2024
1 parent 83226c5 commit c23ec8b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import com.google.cloud.bigquery.storage.v1.AvroSchema;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.StreamStats;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
Expand Down Expand Up @@ -323,6 +326,21 @@ public StreamWriter createStreamWriter(
return mockedWriter;
}

@Override
public WriteStream createWriteStream(String tablePath, WriteStream.Type streamType) {
throw new UnsupportedOperationException("fake createWriteStream not supported");
}

@Override
public FlushRowsResponse flushRows(String streamName, long offset) {
throw new UnsupportedOperationException("fake flushRows not supported");
}

@Override
public FinalizeWriteStreamResponse finalizeWriteStream(String streamName) {
throw new UnsupportedOperationException("fake finalizeWriteStream not supported");
}

@Override
public void close() {
Mockito.when(mockedWriter.isUserClosed()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;

import java.io.IOException;
Expand Down Expand Up @@ -126,6 +129,32 @@ StreamWriter createStreamWriter(
String streamName, ProtoSchema protoSchema, boolean enableConnectionPool)
throws IOException;

/**
* Create a write stream for a BigQuery table.
*
* @param tablePath the table to which the stream belongs.
* @param streamType the type of the stream.
* @return A WriteStream.
*/
WriteStream createWriteStream(String tablePath, WriteStream.Type streamType);

/**
* Flush data in buffered stream to BigQuery table.
*
* @param streamName the write stream to be flushed.
* @param offset the offset to which write stream must be flushed.
* @return A FlushRowsResponse.
*/
FlushRowsResponse flushRows(String streamName, long offset);

/**
* Finalize a BigQuery write stream so that no new data can be appended to the stream.
*
* @param streamName the write stream to be finalized.
* @return A FinalizeWriteStreamResponse.
*/
FinalizeWriteStreamResponse finalizeWriteStream(String streamName);

/**
* Close the client object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,23 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.flink.bigquery.common.config.CredentialsOptions;
import com.google.cloud.flink.bigquery.common.utils.BigQueryPartitionUtils;
import com.google.cloud.flink.bigquery.common.utils.BigQueryTableInfo;
import com.google.protobuf.Int64Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -194,6 +201,35 @@ private StorageWriteClientImpl(CredentialsOptions options) throws IOException {
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build());

UnaryCallSettings.Builder<CreateWriteStreamRequest, WriteStream>
createWriteStreamSettings =
settingsBuilder.getStubSettingsBuilder().createWriteStreamSettings();
createWriteStreamSettings.setRetrySettings(
createWriteStreamSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.build());

UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings =
settingsBuilder.getStubSettingsBuilder().flushRowsSettings();
flushRowsSettings.setRetrySettings(
createWriteStreamSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.build());

UnaryCallSettings.Builder<FinalizeWriteStreamRequest, FinalizeWriteStreamResponse>
finalizeWriteStreamSettings =
settingsBuilder.getStubSettingsBuilder().finalizeWriteStreamSettings();
finalizeWriteStreamSettings.setRetrySettings(
createWriteStreamSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.build());

this.client = BigQueryWriteClient.create(settingsBuilder.build());
}

Expand Down Expand Up @@ -230,6 +266,29 @@ public StreamWriter createStreamWriter(
.build();
}

@Override
public WriteStream createWriteStream(String tablePath, WriteStream.Type streamType) {
return this.client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tablePath)
.setWriteStream(WriteStream.newBuilder().setType(streamType).build())
.build());
}

@Override
public FlushRowsResponse flushRows(String streamName, long offset) {
return this.client.flushRows(
FlushRowsRequest.newBuilder()
.setWriteStream(streamName)
.setOffset(Int64Value.of(offset))
.build());
}

@Override
public FinalizeWriteStreamResponse finalizeWriteStream(String streamName) {
return client.finalizeWriteStream(streamName);
}

@Override
public void close() {
client.close();
Expand Down

0 comments on commit c23ec8b

Please sign in to comment.