Skip to content

Commit

Permalink
Merge pull request #1321 from keyliug:csv_to_bq
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 605647693
  • Loading branch information
cloud-teleport committed Feb 9, 2024
2 parents c7cb59f + 97bd1d7 commit 0072eed
Showing 1 changed file with 1 addition and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -85,24 +83,20 @@ public interface CsvPipelineOptions extends PipelineOptions {

@TemplateParameter.Text(
order = 2,
optional = true,
description = "Column delimiter of the data files.",
helpText =
"The column delimiter of the input text files. Default: use delimiter provided in csvFormat",
example = ",")
@Default.InstanceFactory(DelimiterFactory.class)
ValueProvider<String> getDelimiter();

void setDelimiter(ValueProvider<String> delimiter);

@TemplateParameter.Text(
order = 3,
optional = true,
description = "CSV Format to use for parsing records.",
helpText =
"CSV format specification to use for parsing records. Default is: Default. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: "
"CSV format specification to use for parsing records. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: "
+ "https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html")
@Default.String("Default")
ValueProvider<String> getCsvFormat();

void setCsvFormat(ValueProvider<String> csvFormat);
Expand All @@ -121,23 +115,6 @@ public interface CsvPipelineOptions extends PipelineOptions {
void setCsvFileEncoding(ValueProvider<String> csvFileEncoding);
}

/**
* Default value factory to get delimiter from Csv format so that if the user does not pass one
* in, it matches the supplied {@link CsvPipelineOptions#getCsvFormat()}.
*/
public static class DelimiterFactory implements DefaultValueFactory<ValueProvider<String>> {

@Override
public ValueProvider<String> create(PipelineOptions options) {
return NestedValueProvider.of(
options.as(CsvPipelineOptions.class).getCsvFormat(),
csvFormat -> {
CSVFormat format = CSVFormat.Predefined.valueOf(csvFormat).getFormat();
return String.valueOf(format.getDelimiter());
});
}
}

/**
* The {@link ReadCsv} class is a {@link PTransform} that reads from one for more Csv files. The
* transform returns a {@link PCollectionTuple} consisting of the following {@link PCollection}:
Expand Down

0 comments on commit 0072eed

Please sign in to comment.