Skip to content

Commit

Permalink
Change Spanner Change Streams to BigQuery template parameter from "sp…
Browse files Browse the repository at this point in the history
…annerRpcPriority" to "rpcPriority"

PiperOrigin-RevId: 470346648
  • Loading branch information
cloud-teleport committed Aug 26, 2022
1 parent e49210a commit 9140f7e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public interface SpannerChangeStreamsToBigQueryOptions extends DataflowPipelineO
"Priority for Spanner RPC invocations. Defaults to HIGH. Allowed priorites are LOW, MEDIUM,"
+ " HIGH.")
@Default.Enum("HIGH")
RpcPriority getSpannerRpcPriority();
RpcPriority getRpcPriority();

void setSpannerRpcPriority(RpcPriority value);
void setRpcPriority(RpcPriority value);

@Description("Spanner host endpoint (only used for testing).")
@Default.String("https://batch-spanner.googleapis.com")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private static void validateOptions(SpannerChangeStreamsToBigQueryOptions option
}

private static void setOptions(SpannerChangeStreamsToBigQueryOptions options) {
LOG.info("Setting streaing options");
options.setStreaming(true);
options.setEnableStreamingEngine(true);

Expand Down Expand Up @@ -161,13 +162,15 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options)
? Timestamp.MAX_VALUE
: Timestamp.parseTimestamp(options.getEndTimestamp());

LOG.info("Getting RPC priority");

SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withProjectId(spannerProjectId)
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabase())
.withRpcPriority(options.getSpannerRpcPriority());
.withRpcPriority(options.getRpcPriority());

SpannerIO.ReadChangeStream readChangeStream =
SpannerIO.readChangeStream()
Expand All @@ -177,7 +180,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options)
.withChangeStreamName(options.getSpannerChangeStreamName())
.withInclusiveStartAt(startTimestamp)
.withInclusiveEndAt(endTimestamp)
.withRpcPriority(options.getSpannerRpcPriority());
.withRpcPriority(options.getRpcPriority());

String spannerMetadataTableName = options.getSpannerMetadataTableName();
if (spannerMetadataTableName != null) {
Expand Down

0 comments on commit 9140f7e

Please sign in to comment.