diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java index 9d696e9a3d..1c9dbbe79d 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/BigQueryDynamicConverters.java @@ -15,15 +15,12 @@ */ package com.google.cloud.teleport.templates.common; -// import com.google.cloud.teleport.templates.common.BigQueryConverters; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.TableId; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.options.ValueProvider; @@ -161,14 +158,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - for (int i = 0; i < cells.size(); i++) { - Map object = cells.get(i); - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java index ecb35fe0ab..f1dd2eb237 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/DeadLetterQueueManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -37,14 +38,11 @@ * A manager for the Dead Letter Queue of a pipeline. It helps build re-consumers, and DLQ sinks. */ public class DeadLetterQueueManager implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(DeadLetterQueueManager.class); - private static final String DATETIME_FILEPATH_SUFFIX = "YYYY/MM/dd/HH/mm/"; private final String retryDlqDirectory; private final String severeDlqDirectory; private final int maxRetries; - /* The tag for change events which were retried over the specified count */ public static final TupleTag> PERMANENT_ERRORS = new TupleTag>(); @@ -77,7 +75,6 @@ public static DeadLetterQueueManager create(String dlqDirectory, int maxRetries) public static DeadLetterQueueManager create( String dlqDirectory, String retryDlqUri, int maxRetries) { - String severeDlqUri = FileSystems.matchNewResource(dlqDirectory, true) .resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY) @@ -109,6 +106,12 @@ public PTransform> dlqReconsumer(Integer recheckPeri return FileBasedDeadLetterQueueReconsumer.create(retryDlqDirectory, recheckPeriodMinutes); } + public PCollectionTuple getReconsumerDataTransformForFiles(PCollection input) { + return getReconsumerDataTransform( + input.apply( + "Move and consume", FileBasedDeadLetterQueueReconsumer.moveAndConsumeMatches())); + } + public PCollectionTuple getReconsumerDataTransform(PCollection reconsumedElements) { return reconsumedElements.apply( ParDo.of( @@ -132,7 +135,6 @@ public void process(@Element String input, MultiOutputReceiver output) { output.get(RETRYABLE_ERRORS).output(element); return; } - String error = jsonDLQElement.get("_metadata_error").asText(); element.setErrorMessage(error); output.get(PERMANENT_ERRORS).output(element); diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java new file mode 100644 index 0000000000..98b4dbabe3 --- /dev/null +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/cdc/dlq/PubSubNotifiedDlqIO.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.cdc.dlq; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a DLQ reconsumer where the DLQ files reside in the GCS and the file path comes as a + * PubSub message. It also skips files under specific configured directories , such as temporary and + * severe, so that even if there are misconfigrations done when setting the GCS notifier, the list + * of directories to ignore can be configured. + */ +public class PubSubNotifiedDlqIO extends PTransform> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubNotifiedDlqIO.class); + + private final String gcsNotificationSubscription; + private final List filePathsToIgnore; + + public PubSubNotifiedDlqIO(String gcsNotificationSubscription, List filePathsToIgnore) { + this.gcsNotificationSubscription = gcsNotificationSubscription; + this.filePathsToIgnore = filePathsToIgnore; + } + + @Override + public PCollection expand(PBegin input) { + return input + .apply( + "ReadGcsPubSubSubscription", + PubsubIO.readMessagesWithAttributes().fromSubscription(gcsNotificationSubscription)) + .apply("ExtractGcsFilePath", ParDo.of(new ExtractGcsFileForRetry())); + } + + class ExtractGcsFileForRetry extends DoFn { + @ProcessElement + public void process(ProcessContext context) throws IOException { + PubsubMessage message = context.element(); + String eventType = message.getAttribute("eventType"); + String bucketId = message.getAttribute("bucketId"); + String objectId = message.getAttribute("objectId"); + if (eventType.equals("OBJECT_FINALIZE") && !objectId.endsWith("/")) { + String fileName = "gs://" + bucketId + "/" + objectId; + try { + Metadata fileMetadata = FileSystems.matchSingleFileSpec(fileName); + if (filePathsToIgnore != null) { + for (String filePathToIgnore : filePathsToIgnore) { + if (fileMetadata.resourceId().toString().contains(filePathToIgnore)) { + LOG.info( + "Ignoring severe or temporary file during retry processing {} due to ignore" + + " path {} ", + fileName, + filePathToIgnore); + return; + } + } + } + context.output(fileMetadata); + } catch (FileNotFoundException e) { + LOG.warn("Ignoring non-existent file {}", fileName, e); + } catch (IOException e) { + LOG.error("GCS Failure retrieving {}", fileName, e); + throw e; + } + } + } + } +} diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java index 47c1b44b43..6b25a8ad6c 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryDynamicConverters.java @@ -15,14 +15,12 @@ */ package com.google.cloud.teleport.v2.transforms; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.TableId; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.transforms.MapElements; @@ -146,14 +144,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - for (int i = 0; i < cells.size(); i++) { - Map object = cells.get(i); - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); diff --git a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java index 63112afb3e..0ae14227cc 100644 --- a/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java +++ b/v2/datastream-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryDynamicConverters.java @@ -15,7 +15,6 @@ */ package com.google.cloud.teleport.v2.templates; -import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -23,7 +22,6 @@ import com.google.cloud.teleport.v2.transforms.BigQueryConverters; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.transforms.MapElements; @@ -147,14 +145,11 @@ public TableSchema getSchema(KV destination) { TableRow bqRow = destination.getValue(); TableSchema schema = new TableSchema(); List fields = new ArrayList(); - List cells = bqRow.getF(); - /** currently all BQ data types are set to String */ - for (Map object : cells) { - String header = object.keySet().iterator().next(); + for (String field : bqRow.keySet()) { /** currently all BQ data types are set to String */ // Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in // fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING")); - fields.add(new TableFieldSchema().setName(header).setType("STRING")); + fields.add(new TableFieldSchema().setName(field).setType("STRING")); } schema.setFields(fields); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java index a30690c1e2..def15f9104 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamToSpanner.java @@ -16,11 +16,13 @@ package com.google.cloud.teleport.v2.templates; import com.google.api.services.datastream.v1.model.SourceConfig; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.teleport.metadata.Template; import com.google.cloud.teleport.metadata.TemplateCategory; import com.google.cloud.teleport.metadata.TemplateParameter; import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption; import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager; +import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO; import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; @@ -36,7 +38,10 @@ import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema; import com.google.cloud.teleport.v2.transforms.DLQWriteTransform; import com.google.cloud.teleport.v2.values.FailsafeElement; +import com.google.common.base.Strings; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -78,19 +83,30 @@ category = TemplateCategory.STREAMING, displayName = "Datastream to Cloud Spanner", description = { - "The Datastream to Cloud Spanner template is a streaming pipeline that reads Datastream events from a Cloud Storage bucket and writes them to a Cloud Spanner database. " - + "It is intended for data migration from Datastream sources to Cloud Spanner.\n", - "All tables required for migration must exist in the destination Cloud Spanner database prior to template execution. " - + "Hence schema migration from a source database to destination Cloud Spanner must be completed prior to data migration. " - + "Data can exist in the tables prior to migration. This template does not propagate Datastream schema changes to the Cloud Spanner database.\n", - "Data consistency is guaranteed only at the end of migration when all data has been written to Cloud Spanner. " - + "To store ordering information for each record written to Cloud Spanner, this template creates an additional table (called a shadow table) for each table in the Cloud Spanner database. " - + "This is used to ensure consistency at the end of migration. The shadow tables are not deleted after migration and can be used for validation purposes at the end of migration.\n", - "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or errors resulting from executing transforms, are recorded in an error queue. " - + "The error queue is a Cloud Storage folder which stores all the Datastream events that had encountered errors along with the error reason in text format. " - + "The errors can be transient or permanent and are stored in appropriate Cloud Storage folders in the error queue. " - + "The transient errors are retried automatically while the permanent errors are not. " - + "In case of permanent errors, you have the option of making corrections to the change events and moving them to the retriable bucket while the template is running." + "The Datastream to Cloud Spanner template is a streaming pipeline that reads Datastream events from a Cloud" + + " Storage bucket and writes them to a Cloud Spanner database. It is intended for data" + + " migration from Datastream sources to Cloud Spanner.\n", + "All tables required for migration must exist in the destination Cloud Spanner database prior" + + " to template execution. Hence schema migration from a source database to destination" + + " Cloud Spanner must be completed prior to data migration. Data can exist in the tables" + + " prior to migration. This template does not propagate Datastream schema changes to the" + + " Cloud Spanner database.\n", + "Data consistency is guaranteed only at the end of migration when all data has been written" + + " to Cloud Spanner. To store ordering information for each record written to Cloud" + + " Spanner, this template creates an additional table (called a shadow table) for each" + + " table in the Cloud Spanner database. This is used to ensure consistency at the end of" + + " migration. The shadow tables are not deleted after migration and can be used for" + + " validation purposes at the end of migration.\n", + "Any errors that occur during operation, such as schema mismatches, malformed JSON files, or" + + " errors resulting from executing transforms, are recorded in an error queue. The error" + + " queue is a Cloud Storage folder which stores all the Datastream events that had" + + " encountered errors along with the error reason in text format. The errors can be" + + " transient or permanent and are stored in appropriate Cloud Storage folders in the" + + " error queue. The transient errors are retried automatically while the permanent" + + " errors are not. In case of permanent errors, you have the option of making" + + " corrections to the change events and moving them to the retriable bucket while the" + + " template is running." }, optionsClass = Options.class, flexContainerName = "datastream-to-spanner", @@ -104,7 +120,6 @@ }, streaming = true) public class DataStreamToSpanner { - private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class); private static final String AVRO_SUFFIX = "avro"; private static final String JSON_SUFFIX = "json"; @@ -115,7 +130,6 @@ public class DataStreamToSpanner { *

Inherits standard configuration options. */ public interface Options extends PipelineOptions, StreamingOptions { - @TemplateParameter.Text( order = 1, description = "File location for Datastream file output in Cloud Storage.", @@ -361,6 +375,38 @@ public interface Options extends PipelineOptions, StreamingOptions { Integer getDirectoryWatchDurationInMinutes(); void setDirectoryWatchDurationInMinutes(Integer value); + + @TemplateParameter.Enum( + order = 23, + enumOptions = { + @TemplateEnumOption("LOW"), + @TemplateEnumOption("MEDIUM"), + @TemplateEnumOption("HIGH") + }, + optional = true, + description = "Priority for Spanner RPC invocations", + helpText = + "The request priority for Cloud Spanner calls. The value must be one of:" + + " [HIGH,MEDIUM,LOW]. Defaults to HIGH") + @Default.Enum("HIGH") + RpcPriority getSpannerPriority(); + + void setSpannerPriority(RpcPriority value); + + @TemplateParameter.PubsubSubscription( + order = 24, + optional = true, + description = + "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ" + + " retry directory when running in regular mode.", + helpText = + "The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ" + + " retry directory when running in regular mode. The name should be in the format" + + " of projects//subscriptions/. When set, the" + + " deadLetterQueueDirectory and dlqRetryMinutes are ignored.") + String getDlqGcsPubSubSubscription(); + + void setDlqGcsPubSubSubscription(String value); } private static void validateSourceType(Options options) { @@ -384,11 +430,9 @@ private static String getSourceType(Options options) { if (options.getDatastreamSourceType() != null) { return options.getDatastreamSourceType(); } - if (options.getStreamName() == null) { throw new IllegalArgumentException("Stream name cannot be empty. "); } - GcpOptions gcpOptions = options.as(GcpOptions.class); DataStreamClient datastreamClient; SourceConfig sourceConfig; @@ -399,7 +443,6 @@ private static String getSourceType(Options options) { LOG.error("IOException Occurred: DataStreamClient failed initialization."); throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e); } - // TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated. if (sourceConfig.getMysqlSourceConfig() != null) { return DatastreamConstants.MYSQL_SOURCE_TYPE; @@ -408,7 +451,6 @@ private static String getSourceType(Options options) { } else { return DatastreamConstants.POSTGRES_SOURCE_TYPE; } - // LOG.error("Source Connection Profile Type Not Supported"); // throw new IllegalArgumentException("Unsupported source connection profile type in // Datastream"); @@ -421,15 +463,10 @@ private static String getSourceType(Options options) { */ public static void main(String[] args) { UncaughtExceptionLogger.register(); - LOG.info("Starting DataStream to Cloud Spanner"); - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - validateSourceType(options); - run(options); } @@ -446,10 +483,8 @@ public static PipelineResult run(Options options) { * 2) Write JSON Strings to Cloud Spanner * 3) Write Failures to GCS Dead Letter Queue */ - Pipeline pipeline = Pipeline.create(options); DeadLetterQueueManager dlqManager = buildDlqManager(options); - // Ingest session file into schema object. Schema schema = SessionFileReader.read(options.getSessionFilePath()); /* @@ -460,14 +495,15 @@ public static PipelineResult run(Options options) { * c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements * d) Flatten DataStream and DLQ Streams */ + // Prepare Spanner config SpannerConfig spannerConfig = SpannerConfig.create() .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId())) .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost())) .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId())) - .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())); - + .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())) + .withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority())); /* Process information schema * 1) Read information schema from destination Cloud Spanner database * 2) Check if shadow tables are present and create if necessary @@ -482,26 +518,33 @@ public static PipelineResult run(Options options) { options.getShadowTablePrefix(), options.getDatastreamSourceType())); PCollectionView ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton()); - PCollection> jsonRecords = null; - // Elements sent to the Dead Letter Queue are to be reconsumed. // A DLQManager is to be created using PipelineOptions, and it is in charge // of building pieces of the DLQ. - PCollectionTuple reconsumedElements = - dlqManager.getReconsumerDataTransform( - pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))); + PCollectionTuple reconsumedElements = null; + boolean isRegularMode = "regular".equals(options.getRunMode()); + if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) { + reconsumedElements = + dlqManager.getReconsumerDataTransformForFiles( + pipeline.apply( + "Read retry from PubSub", + new PubSubNotifiedDlqIO( + options.getDlqGcsPubSubSubscription(), + // file paths to ignore when re-consuming for retry + new ArrayList( + Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp"))))); + } else { + reconsumedElements = + dlqManager.getReconsumerDataTransform( + pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes()))); + } PCollection> dlqJsonRecords = reconsumedElements .get(DeadLetterQueueManager.RETRYABLE_ERRORS) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - boolean isRegularMode = "regular".equals(options.getRunMode()); - if (isRegularMode) { - LOG.info("Regular Datastream flow"); - PCollection> datastreamJsonRecords = pipeline.apply( new DataStreamIO( @@ -513,31 +556,25 @@ public static PipelineResult run(Options options) { .withFileReadConcurrency(options.getFileReadConcurrency()) .withDirectoryWatchDuration( Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes()))); - jsonRecords = PCollectionList.of(datastreamJsonRecords) .and(dlqJsonRecords) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } else { - LOG.info("DLQ retry flow"); - jsonRecords = PCollectionList.of(dlqJsonRecords) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } - /* * Stage 2: Write records to Cloud Spanner */ - // Ingest transformation context file into memory. TransformationContext transformationContext = TransformationContextReader.getTransformationContext( options.getTransformationContextFilePath()); - SpannerTransactionWriter.Result spannerWriteResults = jsonRecords.apply( "Write events to Cloud Spanner", @@ -550,7 +587,6 @@ public static PipelineResult run(Options options) { options.getDatastreamSourceType(), options.getRoundJsonDecimals(), isRegularMode)); - /* * Stage 3: Write failures to GCS Dead Letter Queue * a) Retryable errors are written to retry GCS Dead letter queue @@ -566,21 +602,18 @@ public static PipelineResult run(Options options) { "Write To DLQ", DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime()) - .withTmpDirectory(dlqManager.getRetryDlqDirectory() + "tmp/") + .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/") .setIncludePaneInfo(true) .build()); - PCollection> dlqErrorRecords = reconsumedElements .get(DeadLetterQueueManager.PERMANENT_ERRORS) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - PCollection> permanentErrors = PCollectionList.of(dlqErrorRecords) .and(spannerWriteResults.permanentErrors()) .apply(Flatten.pCollections()) .apply("Reshuffle", Reshuffle.viaRandomKey()); - // increment the metrics permanentErrors .apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode))) @@ -592,10 +625,9 @@ public static PipelineResult run(Options options) { "Write To DLQ", DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime()) - .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/") + .withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/") .setIncludePaneInfo(true) .build()); - // Execute the pipeline and return the result. return pipeline.run(); } @@ -605,13 +637,12 @@ private static DeadLetterQueueManager buildDlqManager(Options options) { options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/") ? options.as(DataflowPipelineOptions.class).getTempLocation() : options.as(DataflowPipelineOptions.class).getTempLocation() + "/"; - String dlqDirectory = options.getDeadLetterQueueDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDeadLetterQueueDirectory(); - LOG.info("Dead-letter queue directory: {}", dlqDirectory); + options.setDeadLetterQueueDirectory(dlqDirectory); if ("regular".equals(options.getRunMode())) { return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount()); } else { diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java index 9484c99f5f..1598c1a677 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java @@ -213,7 +213,9 @@ public void processElement(ProcessContext c) { // Start transaction spannerAccessor .getDatabaseClient() - .readWriteTransaction(Options.tag(getTxnTag(c.getPipelineOptions()))) + .readWriteTransaction( + Options.tag(getTxnTag(c.getPipelineOptions())), + Options.priority(spannerConfig.getRpcPriority().get())) .run( (TransactionCallable) transaction -> {