Skip to content

Commit

Permalink
Enfore csvFormat and delimiter as required parameters for GCS_CSV_to_…
Browse files Browse the repository at this point in the history
…BigQuery template

Changing csvFormat to be required so that it's clear to user which format they're using.

Change delimiter to be required due to bug apache/beam#30261
that causes we won't be able to get the value of delimiter dynamically. This change is
fine since delimiter is a common paramter in CSV and lots of users set it anyway.

Signed-off-by: keyliug <[email protected]>
  • Loading branch information
keyliug committed Feb 8, 2024
1 parent 0c74348 commit bb65da2
Showing 1 changed file with 1 addition and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -85,24 +83,20 @@ public interface CsvPipelineOptions extends PipelineOptions {

@TemplateParameter.Text(
order = 2,
optional = true,
description = "Column delimiter of the data files.",
helpText =
"The column delimiter of the input text files. Default: use delimiter provided in csvFormat",
example = ",")
@Default.InstanceFactory(DelimiterFactory.class)
ValueProvider<String> getDelimiter();

void setDelimiter(ValueProvider<String> delimiter);

@TemplateParameter.Text(
order = 3,
optional = true,
description = "CSV Format to use for parsing records.",
helpText =
"CSV format specification to use for parsing records. Default is: Default. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: "
"CSV format specification to use for parsing records. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: "
+ "https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html")
@Default.String("Default")
ValueProvider<String> getCsvFormat();

void setCsvFormat(ValueProvider<String> csvFormat);
Expand All @@ -121,23 +115,6 @@ public interface CsvPipelineOptions extends PipelineOptions {
void setCsvFileEncoding(ValueProvider<String> csvFileEncoding);
}

/**
* Default value factory to get delimiter from Csv format so that if the user does not pass one
* in, it matches the supplied {@link CsvPipelineOptions#getCsvFormat()}.
*/
public static class DelimiterFactory implements DefaultValueFactory<ValueProvider<String>> {

@Override
public ValueProvider<String> create(PipelineOptions options) {
return NestedValueProvider.of(
options.as(CsvPipelineOptions.class).getCsvFormat(),
csvFormat -> {
CSVFormat format = CSVFormat.Predefined.valueOf(csvFormat).getFormat();
return String.valueOf(format.getDelimiter());
});
}
}

/**
* The {@link ReadCsv} class is a {@link PTransform} that reads from one for more Csv files. The
* transform returns a {@link PCollectionTuple} consisting of the following {@link PCollection}:
Expand Down

0 comments on commit bb65da2

Please sign in to comment.