diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java index 87e0128d73ebd..dd9ef5b348686 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java @@ -24,12 +24,7 @@ import java.util.Optional; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.errorhandling.BadRecord; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.commons.csv.CSVFormat; @@ -37,10 +32,6 @@ @AutoValue abstract class CsvIOParseConfiguration implements Serializable { - /** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */ - final PTransform, PCollection> errorHandlerTransform = - new BadRecordOutput(); - static Builder builder() { return new AutoValue_CsvIOParseConfiguration.Builder<>(); } @@ -84,20 +75,4 @@ final CsvIOParseConfiguration build() { return autoBuild(); } } - - private static class BadRecordOutput - extends PTransform, PCollection> { - - @Override - public PCollection expand(PCollection input) { - return input.apply(ParDo.of(new BadRecordTransformFn())); - } - - private static class BadRecordTransformFn extends DoFn { - @ProcessElement - public void process(@Element BadRecord input, OutputReceiver receiver) { - receiver.output(input); - } - } - } } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java index ad7d05912faad..7a2be9786d78a 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java @@ -17,9 +17,18 @@ */ package org.apache.beam.sdk.io.csv; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaProvider; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -36,6 +45,21 @@ static Builder builder() { return new AutoValue_CsvIOParseError.Builder(); } + private static final SchemaProvider SCHEMA_PROVIDER = new AutoValueSchema(); + + private static final TypeDescriptor TYPE = + TypeDescriptor.of(CsvIOParseError.class); + + private static final Schema SCHEMA = checkStateNotNull(SCHEMA_PROVIDER.schemaFor(TYPE)); + + private static final SerializableFunction TO_ROW_FN = + checkStateNotNull(SCHEMA_PROVIDER.toRowFunction(TYPE)); + + private static final SerializableFunction FROM_ROW_FN = + checkStateNotNull(SCHEMA_PROVIDER.fromRowFunction(TYPE)); + + static final Coder CODER = SchemaCoder.of(SCHEMA, TYPE, TO_ROW_FN, FROM_ROW_FN); + /** The caught {@link Exception#getMessage()}. */ public abstract String getMessage(); diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java index 1b8e43314b149..6ddafdccd9fac 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseKV.java @@ -17,9 +17,8 @@ */ package org.apache.beam.sdk.io.csv; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.csv.CSVRecord; @@ -31,11 +30,17 @@ // TODO(https://github.com/apache/beam/issues/31873): implement class after all dependencies are // completed. class CsvIOParseKV - extends PTransform>>, PCollection> { + extends PTransform>>, CsvIOParseResult> { + + private final Coder outputCoder; + + private CsvIOParseKV(Coder outputCoder) { + this.outputCoder = outputCoder; + } // TODO(https://github.com/apache/beam/issues/31873): implement method. @Override - public PCollection expand(PCollection>> input) { - return input.apply(ParDo.of(new DoFn>, T>() {})); + public CsvIOParseResult expand(PCollection>> input) { + return CsvIOParseResult.empty(input.getPipeline(), outputCoder); } } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java new file mode 100644 index 0000000000000..77264fccd2c9f --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * The {@link T} and {@link CsvIOParseError} {@link PCollection} results of parsing CSV records. Use + * {@link #getOutput()} and {@link #getErrors()} to apply these results in a pipeline. + */ +public class CsvIOParseResult implements POutput { + + static CsvIOParseResult of( + TupleTag outputTag, + Coder outputCoder, + TupleTag errorTag, + PCollectionTuple pct) { + return new CsvIOParseResult<>(outputTag, outputCoder, errorTag, pct); + } + + static CsvIOParseResult empty(Pipeline pipeline, Coder outputCoder) { + return new CsvIOParseResult<>( + new TupleTag() {}, + outputCoder, + new TupleTag() {}, + PCollectionTuple.empty(pipeline)); + } + + private final Pipeline pipeline; + private final TupleTag outputTag; + private final PCollection output; + private final TupleTag errorTag; + private final PCollection errors; + + private CsvIOParseResult( + TupleTag outputTag, + Coder outputCoder, + TupleTag errorTag, + PCollectionTuple pct) { + this.outputTag = outputTag; + this.errorTag = errorTag; + this.pipeline = pct.getPipeline(); + this.output = pct.get(outputTag).setCoder(outputCoder); + this.errors = pct.get(errorTag).setCoder(CsvIOParseError.CODER); + } + + /** The {@link T} {@link PCollection} as a result of successfully parsing CSV records. */ + public PCollection getOutput() { + return output; + } + + /** + * The {@link CsvIOParseError} {@link PCollection} as a result of errors associated with parsing + * CSV records. + */ + public PCollection getErrors() { + return errors; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + outputTag, output, + errorTag, errors); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} +} diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java index 4340b68f3c497..97bceb4793454 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjects.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; @@ -27,13 +28,18 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; +import org.joda.time.Instant; /** * {@link CsvIORecordToObjects} is a class that takes an input of {@link PCollection>} * and outputs custom type {@link PCollection}. */ -class CsvIORecordToObjects extends PTransform>, PCollection> { +class CsvIORecordToObjects extends PTransform>, CsvIOParseResult> { /** The expected {@link Schema} of the target type. */ private final Schema schema; @@ -44,6 +50,10 @@ class CsvIORecordToObjects extends PTransform>, PCol /** A {@link Map} of {@link Schema.Field}s to their expected positions within the CSV record. */ private final Map indexToFieldMap; + private final TupleTag outputTag = new TupleTag() {}; + + private final TupleTag errorTag = new TupleTag() {}; + /** * A {@link SerializableFunction} that converts from {@link Row} to {@link Schema} mapped custom * type. @@ -63,23 +73,40 @@ class CsvIORecordToObjects extends PTransform>, PCol } @Override - public PCollection expand(PCollection> input) { - return input.apply(ParDo.of(new RecordToObjectsFn())).setCoder(coder); + public CsvIOParseResult expand(PCollection> input) { + PCollectionTuple pct = + input.apply( + RecordToObjectsFn.class.getSimpleName(), + ParDo.of(new RecordToObjectsFn()).withOutputTags(outputTag, TupleTagList.of(errorTag))); + + return CsvIOParseResult.of(outputTag, coder, errorTag, pct); } private class RecordToObjectsFn extends DoFn, T> { @ProcessElement - public void process(@Element List record, OutputReceiver receiver) { + public void process(@Element List record, MultiOutputReceiver receiver) { Map fieldNamesToValues = new HashMap<>(); - for (Map.Entry entry : indexToFieldMap.entrySet()) { - Schema.Field field = entry.getValue(); - int index = entry.getKey(); - String cell = record.get(index); - Object value = parseCell(cell, field); - fieldNamesToValues.put(field.getName(), value); + try { + for (Map.Entry entry : indexToFieldMap.entrySet()) { + Schema.Field field = entry.getValue(); + int index = entry.getKey(); + String cell = record.get(index); + Object value = parseCell(cell, field); + fieldNamesToValues.put(field.getName(), value); + } + Row row = Row.withSchema(schema).withFieldValues(fieldNamesToValues).build(); + receiver.get(outputTag).output(fromRowFn.apply(row)); + } catch (RuntimeException e) { + receiver + .get(errorTag) + .output( + CsvIOParseError.builder() + .setCsvRecord(record.toString()) + .setMessage(Optional.ofNullable(e.getMessage()).orElse("")) + .setStackTrace(Throwables.getStackTraceAsString(e)) + .setObservedTimestamp(Instant.now()) + .build()); } - Row row = Row.withSchema(schema).withFieldValues(fieldNamesToValues).build(); - receiver.output(fromRowFn.apply(row)); } } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java index b5ce6a0fec229..5fc4954cb450c 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecord.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -27,9 +28,14 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; +import org.joda.time.Instant; /** * {@link CsvIOStringToCsvRecord} is a class that takes a {@link PCollection} input and @@ -37,9 +43,14 @@ * targeted error detection. */ final class CsvIOStringToCsvRecord - extends PTransform, PCollection>> { + extends PTransform, CsvIOParseResult>> { + private final CSVFormat csvFormat; + private final TupleTag> outputTag = new TupleTag>() {}; + + private final TupleTag errorTag = new TupleTag() {}; + CsvIOStringToCsvRecord(CSVFormat csvFormat) { this.csvFormat = csvFormat; } @@ -49,10 +60,15 @@ final class CsvIOStringToCsvRecord * to Row or custom type. */ @Override - public PCollection> expand(PCollection input) { - return input - .apply(ParDo.of(new ProcessLineToRecordFn())) - .setCoder(ListCoder.of(NullableCoder.of(StringUtf8Coder.of()))); + public CsvIOParseResult> expand(PCollection input) { + PCollectionTuple pct = + input.apply( + ProcessLineToRecordFn.class.getSimpleName(), + ParDo.of(new ProcessLineToRecordFn()) + .withOutputTags(outputTag, TupleTagList.of(errorTag))); + + return CsvIOParseResult.of( + outputTag, ListCoder.of(NullableCoder.of(StringUtf8Coder.of())), errorTag, pct); } /** Processes each line in order to convert it to a {@link CSVRecord}. */ @@ -60,13 +76,24 @@ private class ProcessLineToRecordFn extends DoFn> { private final String headerLine = headerLine(csvFormat); @ProcessElement - public void process(@Element String line, OutputReceiver> receiver) - throws IOException { + public void process(@Element String line, MultiOutputReceiver receiver) { if (headerLine.equals(line)) { return; } - for (CSVRecord record : CSVParser.parse(line, csvFormat).getRecords()) { - receiver.output(csvRecordtoList(record)); + try (CSVParser csvParser = CSVParser.parse(line, csvFormat)) { + for (CSVRecord record : csvParser.getRecords()) { + receiver.get(outputTag).output(csvRecordtoList(record)); + } + } catch (IOException e) { + receiver + .get(errorTag) + .output( + CsvIOParseError.builder() + .setCsvRecord(line) + .setMessage(Optional.ofNullable(e.getMessage()).orElse("")) + .setObservedTimestamp(Instant.now()) + .setStackTrace(Throwables.getStackTraceAsString(e)) + .build()); } } } diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java index eb8cacdec5abb..9ccb5d0c7bc25 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIORecordToObjectsTest.java @@ -35,7 +35,6 @@ import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn; -import static org.junit.Assert.assertThrows; import java.math.BigDecimal; import java.util.ArrayList; @@ -55,6 +54,7 @@ import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; @@ -135,7 +135,9 @@ public void parsesToRows() { emptyCustomProcessingMap(), ROW_ROW_SERIALIZABLE_FUNCTION, ALL_PRIMITIVE_DATA_TYPES_ROW_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -152,7 +154,9 @@ public void parsesToPojos() { emptyCustomProcessingMap(), allPrimitiveDataTypesFromRowFn(), ALL_PRIMITIVE_DATA_TYPES_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -176,7 +180,9 @@ public void givenNullableField_containsNullCell_parsesToRows() { emptyCustomProcessingMap(), ROW_ROW_SERIALIZABLE_FUNCTION, NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -193,7 +199,9 @@ public void givenNullableField_containsNullCell_parsesToPojos() { emptyCustomProcessingMap(), nullableAllPrimitiveDataTypesFromRowFn(), NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -214,8 +222,10 @@ public void givenNoNullableField_containsNullCell_throws() { emptyCustomProcessingMap(), allPrimitiveDataTypesFromRowFn(), ALL_PRIMITIVE_DATA_TYPES_CODER); - input.apply(underTest); - assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L); + pipeline.run(); } @Test @@ -228,7 +238,9 @@ public void givenAllNullableFields_emptyRecord_parsesToRows() { emptyCustomProcessingMap(), ROW_ROW_SERIALIZABLE_FUNCTION, NULLABLE_ALL_PRIMITIVE_DATA_TYPES_ROW_CODER); - PAssert.that(input.apply(underTest)).empty(); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -242,7 +254,9 @@ public void givenAllNullableFields_emptyRecord_parsesToPojos() { emptyCustomProcessingMap(), allPrimitiveDataTypesFromRowFn(), ALL_PRIMITIVE_DATA_TYPES_CODER); - PAssert.that(input.apply(underTest)).empty(); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -270,7 +284,9 @@ public void givenFieldHasCustomProcessing_parsesToRows() { timeContainingCustomProcessingMap(), ROW_ROW_SERIALIZABLE_FUNCTION, TIME_CONTAINING_ROW_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -295,7 +311,9 @@ public void givenFieldHasCustomProcessing_parsesToPojos() { timeContainingCustomProcessingMap(), timeContainingFromRowFn(), TIME_CONTAINING_POJO_CODER); - PAssert.that(input.apply(underTest)).containsInAnyOrder(want); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -310,8 +328,11 @@ public void givenInvalidCell_throws() { emptyCustomProcessingMap(), allPrimitiveDataTypesFromRowFn(), ALL_PRIMITIVE_DATA_TYPES_CODER); - input.apply(underTest); - assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L); + + pipeline.run(); } @Test @@ -328,8 +349,11 @@ public void givenInvalidCustomProcessing_throws() { timeContainingCustomProcessingMap(), timeContainingFromRowFn(), TIME_CONTAINING_POJO_CODER); - input.apply(underTest); - assertThrows(Pipeline.PipelineExecutionException.class, pipeline::run); + CsvIOParseResult result = input.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L); + + pipeline.run(); } private static PCollection> csvRecords(Pipeline pipeline, String... cells) { diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java index 1b81391c4fb0f..7cbba3335dd28 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOStringToCsvRecordTest.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -47,12 +48,14 @@ public void givenCommentMarker_skipsLine() { Create.of(headerLine(csvFormat), "#should skip me", "a,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -65,13 +68,15 @@ public void givenNoCommentMarker_doesntSkipLine() { Create.of(headerLine(csvFormat), "#comment", "a,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Collections.singletonList("#comment"), Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -83,12 +88,14 @@ public void givenCustomDelimiter_splitsCells() { pipeline.apply(Create.of(headerLine(csvFormat), "a;1;1.1", "b;2;2.2", "c;3;3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -100,12 +107,14 @@ public void givenEscapeCharacter_includeInCell() { pipeline.apply(Create.of(headerLine(csvFormat), "a$,b,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a,b", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -117,12 +126,14 @@ public void givenHeaderComment_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -134,12 +145,14 @@ public void givenIgnoreEmptyLines_shouldSkip() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", "b,2,2.2", "", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -151,12 +164,14 @@ public void givenNoIgnoreEmptyLines_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "", "b,2,2.2", "", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -173,12 +188,14 @@ public void givenIgnoreSurroundingSpaces_removesSpaces() { "c,3, 3.3 ")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -195,12 +212,14 @@ public void givenNotIgnoreSurroundingSpaces_keepsSpaces() { "c,3, 3.3 ")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList(" a ", "1", "1.1"), Arrays.asList("b", " 2 ", "2.2"), Arrays.asList("c", "3", " 3.3 "))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -212,12 +231,14 @@ public void givenNullString_parsesNullCells() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", "🐼,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", null), Arrays.asList("b", null, "2.2"), Arrays.asList(null, "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -229,12 +250,14 @@ public void givenNoNullString_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,🐼", "b,🐼,2.2", "🐼,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "🐼"), Arrays.asList("b", "🐼", "2.2"), Arrays.asList("🐼", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -246,12 +269,15 @@ public void givenCustomQuoteCharacter_includesSpecialCharacters() { pipeline.apply(Create.of(headerLine(csvFormat), ":a,:,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a,", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -267,12 +293,14 @@ public void givenQuoteModeAll_isNoop() { "\"c\",\"3\",\"3.3\"")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -289,12 +317,14 @@ public void givenQuoteModeAllNonNull_isNoop() { "\"c\",\"3\",\"3.3\"")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", null), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -306,12 +336,15 @@ public void givenQuoteModeMinimal_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "\"a,\",1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a,", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -323,12 +356,15 @@ public void givenQuoteModeNonNumeric_isNoop() { Create.of(headerLine(csvFormat), "\"a\",1,1.1", "\"b\",2,2.2", "\"c\",3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -339,12 +375,15 @@ public void givenQuoteModeNone_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1", "b,2,2.2", "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -355,10 +394,13 @@ public void givenCustomRecordSeparator_isNoop() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1😆b,2,2.2😆c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Collections.singletonList( Arrays.asList("a", "1", "1.1😆b", "2", "2.2😆c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -373,12 +415,15 @@ public void givenSystemRecordSeparator_isNoop() { "a,1,1.1" + systemRecordSeparator + "b,2,2.2" + systemRecordSeparator + "c,3,3.3")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -389,12 +434,15 @@ public void givenTrailingDelimiter_skipsEndingDelimiter() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", "b,2,2.2,", "c,3,3.3,")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -405,12 +453,15 @@ public void givenNoTrailingDelimiter_includesEndingCell() { pipeline.apply(Create.of(headerLine(csvFormat), "a,1,1.1,", "b,2,2.2,", "c,3,3.3,")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1", ""), Arrays.asList("b", "2", "2.2", ""), Arrays.asList("c", "3", "3.3", ""))); + PAssert.that(result.getErrors()).empty(); + pipeline.run(); } @@ -426,12 +477,14 @@ public void givenTrim_removesSpaces() { "c,3, 3.3 ")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a", "1", "1.1"), Arrays.asList("b", "2", "2.2"), Arrays.asList("c", "3", "3.3"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -448,12 +501,14 @@ public void givenNoTrim_keepsSpaces() { "c,3, 3.3 ")); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList(" a ", "1", "1.1"), Arrays.asList("b", " 2 ", "2.2"), Arrays.asList("c", "3", " 3.3 "))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -464,8 +519,10 @@ public void testSingleLineCsvRecord() { PCollection input = pipeline.apply(Create.of(csvRecord)); CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat()); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder(Collections.singletonList(Arrays.asList("a", "1"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); } @@ -478,12 +535,14 @@ public void testMultiLineCsvRecord() { CsvIOStringToCsvRecord underTest = new CsvIOStringToCsvRecord(csvFormat().withRecordSeparator('\n')); - PAssert.that(input.apply(underTest)) + CsvIOParseResult> result = input.apply(underTest); + PAssert.that(result.getOutput()) .containsInAnyOrder( Arrays.asList( Arrays.asList("a\r\n1", "a\r\n2"), Arrays.asList("b\r\n1", "b\r\n2"), Arrays.asList("c\r\n1", "c\r\n2"))); + PAssert.that(result.getErrors()).empty(); pipeline.run(); }