Skip to content

Commit

Permalink
Merge branch 'GoogleCloudPlatform:main' into csv_to_bq
Browse files Browse the repository at this point in the history
  • Loading branch information
keyliug committed Feb 8, 2024
2 parents bb65da2 + c7cb59f commit 97bd1d7
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package com.google.cloud.teleport.templates.common;

// import com.google.cloud.teleport.templates.common.BigQueryConverters;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.TableId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -161,14 +158,11 @@ public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow bqRow = destination.getValue();
TableSchema schema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
List<TableCell> cells = bqRow.getF();
for (int i = 0; i < cells.size(); i++) {
Map<String, Object> object = cells.get(i);
String header = object.keySet().iterator().next();
for (String field : bqRow.keySet()) {
/** currently all BQ data types are set to String */
// Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in
// fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
fields.add(new TableFieldSchema().setName(header).setType("STRING"));
fields.add(new TableFieldSchema().setName(field).setType("STRING"));
}

schema.setFields(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -37,14 +38,11 @@
* A manager for the Dead Letter Queue of a pipeline. It helps build re-consumers, and DLQ sinks.
*/
public class DeadLetterQueueManager implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(DeadLetterQueueManager.class);

private static final String DATETIME_FILEPATH_SUFFIX = "YYYY/MM/dd/HH/mm/";
private final String retryDlqDirectory;
private final String severeDlqDirectory;
private final int maxRetries;

/* The tag for change events which were retried over the specified count */
public static final TupleTag<FailsafeElement<String, String>> PERMANENT_ERRORS =
new TupleTag<FailsafeElement<String, String>>();
Expand Down Expand Up @@ -77,7 +75,6 @@ public static DeadLetterQueueManager create(String dlqDirectory, int maxRetries)

public static DeadLetterQueueManager create(
String dlqDirectory, String retryDlqUri, int maxRetries) {

String severeDlqUri =
FileSystems.matchNewResource(dlqDirectory, true)
.resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
Expand Down Expand Up @@ -109,6 +106,12 @@ public PTransform<PBegin, PCollection<String>> dlqReconsumer(Integer recheckPeri
return FileBasedDeadLetterQueueReconsumer.create(retryDlqDirectory, recheckPeriodMinutes);
}

public PCollectionTuple getReconsumerDataTransformForFiles(PCollection<Metadata> input) {
return getReconsumerDataTransform(
input.apply(
"Move and consume", FileBasedDeadLetterQueueReconsumer.moveAndConsumeMatches()));
}

public PCollectionTuple getReconsumerDataTransform(PCollection<String> reconsumedElements) {
return reconsumedElements.apply(
ParDo.of(
Expand All @@ -132,7 +135,6 @@ public void process(@Element String input, MultiOutputReceiver output) {
output.get(RETRYABLE_ERRORS).output(element);
return;
}

String error = jsonDLQElement.get("_metadata_error").asText();
element.setErrorMessage(error);
output.get(PERMANENT_ERRORS).output(element);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.cdc.dlq;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is a DLQ reconsumer where the DLQ files reside in the GCS and the file path comes as a
* PubSub message. It also skips files under specific configured directories , such as temporary and
* severe, so that even if there are misconfigrations done when setting the GCS notifier, the list
* of directories to ignore can be configured.
*/
public class PubSubNotifiedDlqIO extends PTransform<PBegin, PCollection<Metadata>> {
private static final Logger LOG = LoggerFactory.getLogger(PubSubNotifiedDlqIO.class);

private final String gcsNotificationSubscription;
private final List<String> filePathsToIgnore;

public PubSubNotifiedDlqIO(String gcsNotificationSubscription, List<String> filePathsToIgnore) {
this.gcsNotificationSubscription = gcsNotificationSubscription;
this.filePathsToIgnore = filePathsToIgnore;
}

@Override
public PCollection<Metadata> expand(PBegin input) {
return input
.apply(
"ReadGcsPubSubSubscription",
PubsubIO.readMessagesWithAttributes().fromSubscription(gcsNotificationSubscription))
.apply("ExtractGcsFilePath", ParDo.of(new ExtractGcsFileForRetry()));
}

class ExtractGcsFileForRetry extends DoFn<PubsubMessage, Metadata> {
@ProcessElement
public void process(ProcessContext context) throws IOException {
PubsubMessage message = context.element();
String eventType = message.getAttribute("eventType");
String bucketId = message.getAttribute("bucketId");
String objectId = message.getAttribute("objectId");
if (eventType.equals("OBJECT_FINALIZE") && !objectId.endsWith("/")) {
String fileName = "gs://" + bucketId + "/" + objectId;
try {
Metadata fileMetadata = FileSystems.matchSingleFileSpec(fileName);
if (filePathsToIgnore != null) {
for (String filePathToIgnore : filePathsToIgnore) {
if (fileMetadata.resourceId().toString().contains(filePathToIgnore)) {
LOG.info(
"Ignoring severe or temporary file during retry processing {} due to ignore"
+ " path {} ",
fileName,
filePathToIgnore);
return;
}
}
}
context.output(fileMetadata);
} catch (FileNotFoundException e) {
LOG.warn("Ignoring non-existent file {}", fileName, e);
} catch (IOException e) {
LOG.error("GCS Failure retrieving {}", fileName, e);
throw e;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
*/
package com.google.cloud.teleport.v2.transforms;

import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.TableId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.MapElements;
Expand Down Expand Up @@ -146,14 +144,11 @@ public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow bqRow = destination.getValue();
TableSchema schema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
List<TableCell> cells = bqRow.getF();
for (int i = 0; i < cells.size(); i++) {
Map<String, Object> object = cells.get(i);
String header = object.keySet().iterator().next();
for (String field : bqRow.keySet()) {
/** currently all BQ data types are set to String */
// Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in
// fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
fields.add(new TableFieldSchema().setName(header).setType("STRING"));
fields.add(new TableFieldSchema().setName(field).setType("STRING"));
}

schema.setFields(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.MapElements;
Expand Down Expand Up @@ -147,14 +145,11 @@ public TableSchema getSchema(KV<TableId, TableRow> destination) {
TableRow bqRow = destination.getValue();
TableSchema schema = new TableSchema();
List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
List<TableCell> cells = bqRow.getF();
/** currently all BQ data types are set to String */
for (Map<String, Object> object : cells) {
String header = object.keySet().iterator().next();
for (String field : bqRow.keySet()) {
/** currently all BQ data types are set to String */
// Why do we use checkHeaderName here and not elsewhere, TODO if we add this back in
// fields.add(new TableFieldSchema().setName(checkHeaderName(header)).setType("STRING"));
fields.add(new TableFieldSchema().setName(header).setType("STRING"));
fields.add(new TableFieldSchema().setName(field).setType("STRING"));
}

schema.setFields(fields);
Expand Down
Loading

0 comments on commit 97bd1d7

Please sign in to comment.