diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java index f81b12c681..35b0136f3f 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroSortedBucketIO.java @@ -249,6 +249,11 @@ public Read from(List inputDirectories) { return toBuilder().setInputDirectories(inputDirectories).build(); } + /** Specifies the avro {@link AvroDatumFactory} for reading. */ + public Read withDatumFactory(AvroDatumFactory datumFactory) { + return toBuilder().setDatumFactory(datumFactory).build(); + } + /** Specifies the input filename suffix. */ public Read withSuffix(String filenameSuffix) { return toBuilder().setFilenameSuffix(filenameSuffix).build(); @@ -345,6 +350,18 @@ abstract static class Builder { abstract Write build(); } + /** Writes to the given output directory. */ + public Write to(String outputDirectory) { + return toBuilder() + .setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true)) + .build(); + } + + /** Specifies the avro {@link AvroDatumFactory} for writing. */ + public Write withDatumFactory(AvroDatumFactory datumFactory) { + return toBuilder().setDatumFactory(datumFactory).build(); + } + /** Specifies the number of buckets for partitioning. */ public Write withNumBuckets(int numBuckets) { return toBuilder().setNumBuckets(numBuckets).build(); @@ -365,13 +382,6 @@ public Write withMetadata(Map metadata) { return toBuilder().setMetadata(metadata).build(); } - /** Writes to the given output directory. */ - public Write to(String outputDirectory) { - return toBuilder() - .setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true)) - .build(); - } - /** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */ public Write withTempDirectory(String tempDirectory) { return toBuilder() @@ -379,6 +389,31 @@ public Write withTempDirectory(String tempDirectory) { .build(); } + /** Specifies the output filename suffix. */ + public Write withSuffix(String filenameSuffix) { + return toBuilder().setFilenameSuffix(filenameSuffix).build(); + } + + /** Specifies the output filename prefix (i.e. "bucket" or "part"). */ + public Write withFilenamePrefix(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** Specifies the sorter memory in MB. */ + public Write withSorterMemoryMb(int sorterMemoryMb) { + return toBuilder().setSorterMemoryMb(sorterMemoryMb).build(); + } + + /** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */ + public Write withKeyCacheOfSize(int keyCacheSize) { + return toBuilder().setKeyCacheSize(keyCacheSize).build(); + } + + /** Specifies the output file {@link CodecFactory}. */ + public Write withCodec(CodecFactory codec) { + return toBuilder().setCodec(codec).build(); + } + @SuppressWarnings("unchecked") @Override public FileOperations getFileOperations() { @@ -405,31 +440,6 @@ BucketMetadata getBucketMetadata() { throw new IllegalStateException(e); } } - - /** Specifies the output filename suffix. */ - public Write withSuffix(String filenameSuffix) { - return toBuilder().setFilenameSuffix(filenameSuffix).build(); - } - - /** Specifies the output filename prefix (i.e. "bucket" or "part"). */ - public Write withFilenamePrefix(String filenamePrefix) { - return toBuilder().setFilenamePrefix(filenamePrefix).build(); - } - - /** Specifies the sorter memory in MB. */ - public Write withSorterMemoryMb(int sorterMemoryMb) { - return toBuilder().setSorterMemoryMb(sorterMemoryMb).build(); - } - - /** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */ - public Write withKeyCacheOfSize(int keyCacheSize) { - return toBuilder().setKeyCacheSize(keyCacheSize).build(); - } - - /** Specifies the output file {@link CodecFactory}. */ - public Write withCodec(CodecFactory codec) { - return toBuilder().setCodec(codec).build(); - } } //////////////////////////////////////////////////////////////////////////////// @@ -491,6 +501,11 @@ public TransformOutput to(String outputDirectory) { .build(); } + /** Specifies the avro {@link AvroDatumFactory} for reading and writing. */ + public TransformOutput withDatumFactory(AvroDatumFactory datumFactory) { + return toBuilder().setDatumFactory(datumFactory).build(); + } + /** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */ public TransformOutput withTempDirectory(String tempDirectory) { return toBuilder() diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java index 6907c7d33a..6371284cae 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java @@ -51,10 +51,10 @@ import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashFunction; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; /** * Represents metadata in a JSON-serializable format to be stored alongside sorted-bucket files in a