Skip to content

Commit

Permalink
Add missing avro SMB api (#5202)
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones authored Jan 25, 2024
1 parent 91d4d80 commit 80c6d3c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public Read<T> from(List<String> inputDirectories) {
return toBuilder().setInputDirectories(inputDirectories).build();
}

/** Specifies the avro {@link AvroDatumFactory} for reading. */
public Read<T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the input filename suffix. */
public Read<T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
Expand Down Expand Up @@ -345,6 +350,18 @@ abstract static class Builder<K1, K2, T extends IndexedRecord> {
abstract Write<K1, K2, T> build();
}

/** Writes to the given output directory. */
public Write<K1, K2, T> to(String outputDirectory) {
return toBuilder()
.setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true))
.build();
}

/** Specifies the avro {@link AvroDatumFactory} for writing. */
public Write<K1, K2, T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the number of buckets for partitioning. */
public Write<K1, K2, T> withNumBuckets(int numBuckets) {
return toBuilder().setNumBuckets(numBuckets).build();
Expand All @@ -365,20 +382,38 @@ public Write<K1, K2, T> withMetadata(Map<String, Object> metadata) {
return toBuilder().setMetadata(metadata).build();
}

/** Writes to the given output directory. */
public Write<K1, K2, T> to(String outputDirectory) {
return toBuilder()
.setOutputDirectory(FileSystems.matchNewResource(outputDirectory, true))
.build();
}

/** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */
public Write<K1, K2, T> withTempDirectory(String tempDirectory) {
return toBuilder()
.setTempDirectory(FileSystems.matchNewResource(tempDirectory, true))
.build();
}

/** Specifies the output filename suffix. */
public Write<K1, K2, T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}

/** Specifies the output filename prefix (i.e. "bucket" or "part"). */
public Write<K1, K2, T> withFilenamePrefix(String filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}

/** Specifies the sorter memory in MB. */
public Write<K1, K2, T> withSorterMemoryMb(int sorterMemoryMb) {
return toBuilder().setSorterMemoryMb(sorterMemoryMb).build();
}

/** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */
public Write<K1, K2, T> withKeyCacheOfSize(int keyCacheSize) {
return toBuilder().setKeyCacheSize(keyCacheSize).build();
}

/** Specifies the output file {@link CodecFactory}. */
public Write<K1, K2, T> withCodec(CodecFactory codec) {
return toBuilder().setCodec(codec).build();
}

@SuppressWarnings("unchecked")
@Override
public FileOperations<T> getFileOperations() {
Expand All @@ -405,31 +440,6 @@ BucketMetadata<K1, K2, T> getBucketMetadata() {
throw new IllegalStateException(e);
}
}

/** Specifies the output filename suffix. */
public Write<K1, K2, T> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}

/** Specifies the output filename prefix (i.e. "bucket" or "part"). */
public Write<K1, K2, T> withFilenamePrefix(String filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}

/** Specifies the sorter memory in MB. */
public Write<K1, K2, T> withSorterMemoryMb(int sorterMemoryMb) {
return toBuilder().setSorterMemoryMb(sorterMemoryMb).build();
}

/** Specifies the size of an optional key-to-hash cache in the ExtractKeys transform. */
public Write<K1, K2, T> withKeyCacheOfSize(int keyCacheSize) {
return toBuilder().setKeyCacheSize(keyCacheSize).build();
}

/** Specifies the output file {@link CodecFactory}. */
public Write<K1, K2, T> withCodec(CodecFactory codec) {
return toBuilder().setCodec(codec).build();
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -491,6 +501,11 @@ public TransformOutput<K1, K2, T> to(String outputDirectory) {
.build();
}

/** Specifies the avro {@link AvroDatumFactory} for reading and writing. */
public TransformOutput<K1, K2, T> withDatumFactory(AvroDatumFactory<T> datumFactory) {
return toBuilder().setDatumFactory(datumFactory).build();
}

/** Specifies the temporary directory for writing. Defaults to --tempLocation if not set. */
public TransformOutput<K1, K2, T> withTempDirectory(String tempDirectory) {
return toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;

/**
* Represents metadata in a JSON-serializable format to be stored alongside sorted-bucket files in a
Expand Down

0 comments on commit 80c6d3c

Please sign in to comment.