From b3b2baf50f7c01e284d4d4db2577d04bd6304d6f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 31 Jan 2024 13:28:49 -0500 Subject: [PATCH 1/9] properly fetch TableRow field names --- .../teleport/v2/templates/BigQueryDynamicConverters.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java index 63112afb3e..7e9b100a76 100644 --- a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java +++ b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java @@ -147,14 +147,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - /** currently all BQ data types are set to String */ - for (Map object : cells) { - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); From 26179b04963530b04b7720bea340d671afd0c4f7 Mon Sep 17 00:00:00 2001 From: Manit Gupta Date: Wed, 31 Jan 2024 17:33:32 +0530 Subject: [PATCH 2/9] Add RpcPriority flag and change default --- .../v2/templates/DataStreamToSpanner.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java index a30690c1e2..5872f84802 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java @@ -16,6 +16,7 @@ package com.google.cloud.teleport.v2.templates; import com.google.api.services.datastream.v1.model.SourceConfig; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; @@ -361,6 +362,23 @@ public interface Options extends PipelineOptions, StreamingOptions { Integer getDirectoryWatchDurationInMinutes(); void setDirectoryWatchDurationInMinutes(Integer value); + + @TemplateParameter.Enum( + order = 23, + enumOptions = { + @TemplateEnumOption("LOW"), + @TemplateEnumOption("MEDIUM"), + @TemplateEnumOption("HIGH") + }, + optional = true, + description = "Priority for Spanner RPC invocations", + helpText = + "The request priority for Cloud Spanner calls. The value must be one of:" + + " [HIGH,MEDIUM,LOW]. Defaults to HIGH") + @Default.Enum("HIGH") + RpcPriority getSpannerPriority(); + + void setSpannerPriority(RpcPriority value); } private static void validateSourceType(Options options) { @@ -460,13 +478,15 @@ public static PipelineResult run(Options options) { * c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements * d) Flatten DataStream and DLQ Streams */ + // Prepare Spanner config SpannerConfig spannerConfig = SpannerConfig.create() .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId())) .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost())) .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId())) - .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())); + .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())) + .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority())); /* Process information schema * 1) Read information schema from destination Cloud Spanner database From d3d8c0307273ab76358bd36f4fa7aeb0a3c31529 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 31 Jan 2024 13:33:23 -0500 Subject: [PATCH 3/9] apply fix to other places --- .../templates/common/BigQueryDynamicConverters.java | 7 ++----- .../teleport/v2/transforms/BigQueryDynamicConverters.java | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java index 9d696e9a3d..d7ee77b0e5 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java @@ -161,14 +161,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - for (int i = 0; i < cells.size(); i++) { - Map object = cells.get(i); - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java index 47c1b44b43..40d55b352b 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java @@ -146,14 +146,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - for (int i = 0; i < cells.size(); i++) { - Map object = cells.get(i); - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); From 6889f90fc3583a5da77f241415f7b315afb54a42 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 31 Jan 2024 13:44:15 -0500 Subject: [PATCH 4/9] spotless --- .../teleport/templates/common/BigQueryDynamicConverters.java | 3 --- .../teleport/v2/transforms/BigQueryDynamicConverters.java | 2 -- .../cloud/teleport/v2/templates/BigQueryDynamicConverters.java | 2 -- 3 files changed, 7 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java index d7ee77b0e5..1c9dbbe79d 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java @@ -15,15 +15,12 @@ */ package com.google.cloud.teleport.templates.common; -// import com.google.cloud.teleport.templates.common.BigQueryConverters; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.TableId; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.options.ValueProvider; diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java index 40d55b352b..6b25a8ad6c 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java @@ -15,14 +15,12 @@ */ package com.google.cloud.teleport.v2.transforms; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.TableId; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.transforms.MapElements; diff --git a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java index 7e9b100a76..0ae14227cc 100644 --- a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java +++ b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java @@ -15,7 +15,6 @@ */ package com.google.cloud.teleport.v2.templates; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -23,7 +22,6 @@ import com.google.cloud.teleport.v2.transforms.BigQueryConverters; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.transforms.MapElements; From fb39b3ef8e3c7bfb04592b63b9c3c78b08cf94a5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 31 Jan 2024 15:14:24 -0500 Subject: [PATCH 5/9] touch file --- .../teleport/templates/common/BigQueryDynamicConverters.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java index 1c9dbbe79d..df3dd933bb 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java @@ -159,7 +159,7 @@ public TableSchema getSchema(KV destination) { TableSchema schema = new TableSchema(); List fields = new ArrayList(); for (String field : bqRow.keySet()) { - /** currently all BQ data types are set to String */ + /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); fields.add(new TableFieldSchema().setName(field).setType("STRING")); From c247035171ea6ba3eefa72324bfffe7e2f678af6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 31 Jan 2024 15:19:09 -0500 Subject: [PATCH 6/9] spotless --- .../teleport/templates/common/BigQueryDynamicConverters.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java index df3dd933bb..1c9dbbe79d 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java @@ -159,7 +159,7 @@ public TableSchema getSchema(KV destination) { TableSchema schema = new TableSchema(); List fields = new ArrayList(); for (String field : bqRow.keySet()) { - /** currently all BQ data types are set to String */ + /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); fields.add(new TableFieldSchema().setName(field).setType("STRING")); From cfe9196f23c26bf4c464c0a6666258c06e401c27 Mon Sep 17 00:00:00 2001 From: Manit Gupta Date: Thu, 8 Feb 2024 00:12:16 +0530 Subject: [PATCH 7/9] Add RpcPriority to Txn --- .../teleport/v2/templates/SpannerTransactionWriterDoFn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java index 9484c99f5f..63d4386c10 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java @@ -213,7 +213,7 @@ public void processElement(ProcessContext c) { // Start transaction spannerAccessor .getDatabaseClient() - .readWriteTransaction(Options.tag(getTxnTag(c.getPipelineOptions()))) + .readWriteTransaction(Options.tag(getTxnTag(c.getPipelineOptions())), Options.priority(spannerConfig.getRpcPriority().get())) .run( (TransactionCallable) transaction -> { From 163b1005677ec6c15c48c0ec56008b1da6f6fea5 Mon Sep 17 00:00:00 2001 From: Manit Gupta Date: Thu, 8 Feb 2024 00:16:42 +0530 Subject: [PATCH 8/9] spotless --- .../teleport/v2/templates/SpannerTransactionWriterDoFn.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java index 63d4386c10..1598c1a677 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java @@ -213,7 +213,9 @@ public void processElement(ProcessContext c) { // Start transaction spannerAccessor .getDatabaseClient() - .readWriteTransaction(Options.tag(getTxnTag(c.getPipelineOptions())), Options.priority(spannerConfig.getRpcPriority().get())) + .readWriteTransaction( + Options.tag(getTxnTag(c.getPipelineOptions())), + Options.priority(spannerConfig.getRpcPriority().get())) .run( (TransactionCallable) transaction -> { From f5a1744a4ec89887f9a70380d5a9ac31871e0831 Mon Sep 17 00:00:00 2001 From: aksharau Date: Thu, 8 Feb 2024 00:21:32 -0800 Subject: [PATCH 9/9] Give ability to use PubSub notification based GCS lookup of retry files. This ensures that the workers scale as per the DLQ backlog, multiple workers do not process the same retry files and the pipeline is more efficient. PiperOrigin-RevId: 605224270 --- .../v2/cdc/dlq/DeadLetterQueueManager.java | 12 +- .../v2/cdc/dlq/PubSubNotifiedDlqIO.java | 92 ++++++++++++++ .../v2/templates/DataStreamToSpanner.java | 115 ++++++++++-------- 3 files changed, 162 insertions(+), 57 deletions(-) create mode 100644 v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java index ecb35fe0ab..f1dd2eb237 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -37,14 +38,11 @@ * A manager for the Dead Letter Queue of a pipeline. It helps build re-consumers, and DLQ sinks. */ public class DeadLetterQueueManager implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(DeadLetterQueueManager.class); - private static final String DATETIME_FILEPATH_SUFFIX = "YYYY/MM/dd/HH/mm/"; private final String retryDlqDirectory; private final String severeDlqDirectory; private final int maxRetries; - /* The tag for change events which were retried over the specified count */ public static final TupleTag> PERMANENT_ERRORS = new TupleTag>(); @@ -77,7 +75,6 @@ public static DeadLetterQueueManager create(String dlqDirectory, int maxRetries) public static DeadLetterQueueManager create( String dlqDirectory, String retryDlqUri, int maxRetries) { - String severeDlqUri = FileSystems.matchNewResource(dlqDirectory, true) .resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY) @@ -109,6 +106,12 @@ public PTransform> dlqReconsumer(Integer recheckPeri return FileBasedDeadLetterQueueReconsumer.create(retryDlqDirectory, recheckPeriodMinutes); } + public PCollectionTuple getReconsumerDataTransformForFiles(PCollection input) { + return getReconsumerDataTransform( + input.apply( + "Move and consume", FileBasedDeadLetterQueueReconsumer.moveAndConsumeMatches())); + } + public PCollectionTuple getReconsumerDataTransform(PCollection reconsumedElements) { return reconsumedElements.apply( ParDo.of( @@ -132,7 +135,6 @@ public void process(@Element String input, MultiOutputReceiver output) { output.get(RETRYABLE_ERRORS).output(element); return; } - String error = jsonDLQElement.get("_metadata_error").asText(); element.setErrorMessage(error); output.get(PERMANENT_ERRORS).output(element); diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java new file mode 100644 index 0000000000..98b4dbabe3 --- /dev/null +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.cdc.dlq; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a DLQ reconsumer where the DLQ files reside in the GCS and the file path comes as a + * PubSub message. It also skips files under specific configured directories , such as temporary and + * severe, so that even if there are misconfigrations done when setting the GCS notifier, the list + * of directories to ignore can be configured. + */ +public class PubSubNotifiedDlqIO extends PTransform> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubNotifiedDlqIO.class); + + private final String gcsNotificationSubscription; + private final List filePathsToIgnore; + + public PubSubNotifiedDlqIO(String gcsNotificationSubscription, List filePathsToIgnore) { + this.gcsNotificationSubscription = gcsNotificationSubscription; + this.filePathsToIgnore = filePathsToIgnore; + } + + @Override + public PCollection expand(PBegin input) { + return input + .apply( + "ReadGcsPubSubSubscription", + PubsubIO.readMessagesWithAttributes().fromSubscription(gcsNotificationSubscription)) + .apply("ExtractGcsFilePath", ParDo.of(new ExtractGcsFileForRetry())); + } + + class ExtractGcsFileForRetry extends DoFn { + @ProcessElement + public void process(ProcessContext context) throws IOException { + PubsubMessage message = context.element(); + String eventType = message.getAttribute("eventType"); + String bucketId = message.getAttribute("bucketId"); + String objectId = message.getAttribute("objectId"); + if (eventType.equals("OBJECT_FINALIZE") && !objectId.endsWith("/")) { + String fileName = "gs://" + bucketId + "/" + objectId; + try { + Metadata fileMetadata = FileSystems.matchSingleFileSpec(fileName); + if (filePathsToIgnore != null) { + for (String filePathToIgnore : filePathsToIgnore) { + if (fileMetadata.resourceId().toString().contains(filePathToIgnore)) { + LOG.info( + "Ignoring severe or temporary file during retry processing {} due to ignore" + + " path {} ", + fileName, + filePathToIgnore); + return; + } + } + } + context.output(fileMetadata); + } catch (FileNotFoundException e) { + LOG.warn("Ignoring non-existent file {}", fileName, e); + } catch (IOException e) { + LOG.error("GCS Failure retrieving {}", fileName, e); + throw e; + } + } + } + } +} diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java index 5872f84802..def15f9104 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java @@ -22,6 +22,7 @@ import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager; +import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO; import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; @@ -37,7 +38,10 @@ import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema; import com.google.cloud.teleport.v2.transforms.DLQWriteTransform; import com.google.cloud.teleport.v2.values.FailsafeElement; +import com.google.common.base.Strings; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -79,19 +83,30 @@ category = TemplateCategory.STREAMING, displayName = "Datastream to Cloud Spanner", description = { - "The Datastream to Cloud Spanner template is a streaming pipeline that reads Datastream events from a Cloud Storage bucket and writes them to a Cloud Spanner database. " - + "It is intended for data migration from Datastream sources to Cloud Spanner.\n", - "All tables required for migration must exist in the destination Cloud Spanner database prior to template execution. " - + "Hence schema migration from a source database to destination Cloud Spanner must be completed prior to data migration. " - + "Data can exist in the tables prior to migration. This template does not propagate Datastream schema changes to the Cloud Spanner database.\n", - "Data consistency is guaranteed only at the end of migration when all data has been written to Cloud Spanner. " - + "To store ordering information for each record written to Cloud Spanner, this template creates an additional table (called a shadow table) for each table in the Cloud Spanner database. " - + "This is used to ensure consistency at the end of migration. The shadow tables are not deleted after migration and can be used for validation purposes at the end of migration.\n", - "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or errors resulting from executing transforms, are recorded in an error queue. " - + "The error queue is a Cloud Storage folder which stores all the Datastream events that had encountered errors along with the error reason in text format. " - + "The errors can be transient or permanent and are stored in appropriate Cloud Storage folders in the error queue. " - + "The transient errors are retried automatically while the permanent errors are not. " - + "In case of permanent errors, you have the option of making corrections to the change events and moving them to the retriable bucket while the template is running." + "The Datastream to Cloud Spanner template is a streaming pipeline that reads Datastream events from a Cloud" + + " Storage bucket and writes them to a Cloud Spanner database. It is intended for data" + + " migration from Datastream sources to Cloud Spanner.\n", + "All tables required for migration must exist in the destination Cloud Spanner database prior" + + " to template execution. Hence schema migration from a source database to destination" + + " Cloud Spanner must be completed prior to data migration. Data can exist in the tables" + + " prior to migration. This template does not propagate Datastream schema changes to the" + + " Cloud Spanner database.\n", + "Data consistency is guaranteed only at the end of migration when all data has been written" + + " to Cloud Spanner. To store ordering information for each record written to Cloud" + + " Spanner, this template creates an additional table (called a shadow table) for each" + + " table in the Cloud Spanner database. This is used to ensure consistency at the end of" + + " migration. The shadow tables are not deleted after migration and can be used for" + + " validation purposes at the end of migration.\n", + "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or" + + " errors resulting from executing transforms, are recorded in an error queue. The error" + + " queue is a Cloud Storage folder which stores all the Datastream events that had" + + " encountered errors along with the error reason in text format. The errors can be" + + " transient or permanent and are stored in appropriate Cloud Storage folders in the" + + " error queue. The transient errors are retried automatically while the permanent" + + " errors are not. In case of permanent errors, you have the option of making" + + " corrections to the change events and moving them to the retriable bucket while the" + + " template is running." }, optionsClass = Options.class, flexContainerName = "datastream-to-spanner", @@ -105,7 +120,6 @@ }, streaming = true) public class DataStreamToSpanner { - private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class); private static final String AVRO_SUFFIX = "avro"; private static final String JSON_SUFFIX = "json"; @@ -116,7 +130,6 @@ public class DataStreamToSpanner { *

Inherits standard configuration options. */ public interface Options extends PipelineOptions, StreamingOptions { - @TemplateParameter.Text( order = 1, description = "File location for Datastream file output in Cloud Storage.", @@ -379,6 +392,21 @@ public interface Options extends PipelineOptions, StreamingOptions { RpcPriority getSpannerPriority(); void setSpannerPriority(RpcPriority value); + + @TemplateParameter.PubsubSubscription( + order = 24, + optional = true, + description = + "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ" + + " retry directory when running in regular mode.", + helpText = + "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ" + + " retry directory when running in regular mode. The name should be in the format" + + " of projects//subscriptions/. When set, the" + + " deadLetterQueueDirectory and dlqRetryMinutes are ignored.") + String getDlqGcsPubSubSubscription(); + + void setDlqGcsPubSubSubscription(String value); } private static void validateSourceType(Options options) { @@ -402,11 +430,9 @@ private static String getSourceType(Options options) { if (options.getDatastreamSourceType() != null) { return options.getDatastreamSourceType(); } - if (options.getStreamName() == null) { throw new IllegalArgumentException("Stream name cannot be empty. "); } - GcpOptions gcpOptions = options.as(GcpOptions.class); DataStreamClient datastreamClient; SourceConfig sourceConfig; @@ -417,7 +443,6 @@ private static String getSourceType(Options options) { LOG.error("IOException Occurred: DataStreamClient failed initialization."); throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e); } - // TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated. if (sourceConfig.getMysqlSourceConfig() != null) { return DatastreamConstants.MYSQL_SOURCE_TYPE; @@ -426,7 +451,6 @@ private static String getSourceType(Options options) { } else { return DatastreamConstants.POSTGRES_SOURCE_TYPE; } - // LOG.error("Source Connection Profile Type Not Supported"); // throw new IllegalArgumentException("Unsupported source connection profile type in // Datastream"); @@ -439,15 +463,10 @@ private static String getSourceType(Options options) { */ public static void main(String[] args) { UncaughtExceptionLogger.register(); - LOG.info("Starting DataStream to Cloud Spanner"); - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - validateSourceType(options); - run(options); } @@ -464,10 +483,8 @@ public static PipelineResult run(Options options) { * 2) Write JSON Strings to Cloud Spanner * 3) Write Failures to GCS Dead Letter Queue */ - Pipeline pipeline = Pipeline.create(options); DeadLetterQueueManager dlqManager = buildDlqManager(options); - // Ingest session file into schema object. Schema schema = SessionFileReader.read(options.getSessionFilePath()); /* @@ -487,7 +504,6 @@ public static PipelineResult run(Options options) { .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId())) .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())) .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority())); - /* Process information schema * 1) Read information schema from destination Cloud Spanner database * 2) Check if shadow tables are present and create if necessary @@ -502,26 +518,33 @@ public static PipelineResult run(Options options) { options.getShadowTablePrefix(), options.getDatastreamSourceType())); PCollectionView ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton()); - PCollection> jsonRecords = null; - // Elements sent to the Dead Letter Queue are to be reconsumed. // A DLQManager is to be created using PipelineOptions, and it is in charge // of building pieces of the DLQ. - PCollectionTuple reconsumedElements = - dlqManager.getReconsumerDataTransform( - pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))); + PCollectionTuple reconsumedElements = null; + boolean isRegularMode = "regular".equals(options.getRunMode()); + if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) { + reconsumedElements = + dlqManager.getReconsumerDataTransformForFiles( + pipeline.apply( + "Read retry from PubSub", + new PubSubNotifiedDlqIO( + options.getDlqGcsPubSubSubscription(), + // file paths to ignore when re-consuming for retry + new ArrayList( + Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp"))))); + } else { + reconsumedElements = + dlqManager.getReconsumerDataTransform( + pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))); + } PCollection> dlqJsonRecords = reconsumedElements .get(DeadLetterQueueManager.RETRYABLE_ERRORS) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - boolean isRegularMode = "regular".equals(options.getRunMode()); - if (isRegularMode) { - LOG.info("Regular Datastream flow"); - PCollection> datastreamJsonRecords = pipeline.apply( new DataStreamIO( @@ -533,31 +556,25 @@ public static PipelineResult run(Options options) { .withFileReadConcurrency(options.getFileReadConcurrency()) .withDirectoryWatchDuration( Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes()))); - jsonRecords = PCollectionList.of(datastreamJsonRecords) .and(dlqJsonRecords) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } else { - LOG.info("DLQ retry flow"); - jsonRecords = PCollectionList.of(dlqJsonRecords) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } - /* * Stage 2: Write records to Cloud Spanner */ - // Ingest transformation context file into memory. TransformationContext transformationContext = TransformationContextReader.getTransformationContext( options.getTransformationContextFilePath()); - SpannerTransactionWriter.Result spannerWriteResults = jsonRecords.apply( "Write events to Cloud Spanner", @@ -570,7 +587,6 @@ public static PipelineResult run(Options options) { options.getDatastreamSourceType(), options.getRoundJsonDecimals(), isRegularMode)); - /* * Stage 3: Write failures to GCS Dead Letter Queue * a) Retryable errors are written to retry GCS Dead letter queue @@ -586,21 +602,18 @@ public static PipelineResult run(Options options) { "Write To DLQ", DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime()) - .withTmpDirectory(dlqManager.getRetryDlqDirectory() + "tmp/") + .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/") .setIncludePaneInfo(true) .build()); - PCollection> dlqErrorRecords = reconsumedElements .get(DeadLetterQueueManager.PERMANENT_ERRORS) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - PCollection> permanentErrors = PCollectionList.of(dlqErrorRecords) .and(spannerWriteResults.permanentErrors()) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); - // increment the metrics permanentErrors .apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode))) @@ -612,10 +625,9 @@ public static PipelineResult run(Options options) { "Write To DLQ", DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime()) - .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/") + .withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/") .setIncludePaneInfo(true) .build()); - // Execute the pipeline and return the result. return pipeline.run(); } @@ -625,13 +637,12 @@ private static DeadLetterQueueManager buildDlqManager(Options options) { options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/") ? options.as(DataflowPipelineOptions.class).getTempLocation() : options.as(DataflowPipelineOptions.class).getTempLocation() + "/"; - String dlqDirectory = options.getDeadLetterQueueDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDeadLetterQueueDirectory(); - LOG.info("Dead-letter queue directory: {}", dlqDirectory); + options.setDeadLetterQueueDirectory(dlqDirectory); if ("regular".equals(options.getRunMode())) { return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount()); } else {