Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 8, 2024
1 parent 0d3292b commit f443bd2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ abstract static class Builder {

abstract Builder setNanValueCounts(Map<Integer, Long> nanValueCounts);

abstract Builder setLowerBounds(Map<Integer, byte[]> lowerBounds);
abstract Builder setLowerBounds(@Nullable Map<Integer, byte[]> lowerBounds);

abstract Builder setUpperBounds(Map<Integer, byte[]> upperBounds);
abstract Builder setUpperBounds(@Nullable Map<Integer, byte[]> upperBounds);

abstract SerializableDataFile build();
}
Expand All @@ -116,39 +116,22 @@ abstract static class Builder {
* PartitionKey}.
*/
static SerializableDataFile from(DataFile f, PartitionKey key) {
SerializableDataFile.Builder builder =
SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
.setPartitionPath(key.toPath())
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
.setColumnSizes(f.columnSizes())
.setValueCounts(f.valueCounts())
.setNullValueCounts(f.nullValueCounts())
.setNanValueCounts(f.nanValueCounts());

// ByteBuddyUtils has trouble converting Map value type ByteBuffer
// to byte[] and back to ByteBuffer, so we perform this conversion manually
// here.
if (f.lowerBounds() != null) {
Map<Integer, byte[]> lowerBounds = new HashMap<>(f.lowerBounds().size());
for (Map.Entry<Integer, ByteBuffer> e : f.lowerBounds().entrySet()) {
lowerBounds.put(e.getKey(), e.getValue().array());
}
builder = builder.setLowerBounds(lowerBounds);
}
if (f.upperBounds() != null) {
Map<Integer, byte[]> upperBounds = new HashMap<>(f.upperBounds().size());
for (Map.Entry<Integer, ByteBuffer> e : f.upperBounds().entrySet()) {
upperBounds.put(e.getKey(), e.getValue().array());
}
builder = builder.setUpperBounds(upperBounds);
}
return builder.build();
return SerializableDataFile.builder()
.setPath(f.path().toString())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
.setPartitionPath(key.toPath())
.setPartitionSpecId(f.specId())
.setKeyMetadata(f.keyMetadata())
.setSplitOffsets(f.splitOffsets())
.setColumnSizes(f.columnSizes())
.setValueCounts(f.valueCounts())
.setNullValueCounts(f.nullValueCounts())
.setNanValueCounts(f.nanValueCounts())
.setLowerBounds(toByteArrayMap(f.lowerBounds()))
.setUpperBounds(toByteArrayMap(f.upperBounds()))
.build();
}

/**
Expand All @@ -165,33 +148,15 @@ DataFile createDataFile(PartitionSpec partitionSpec) {
partitionSpec.specId(),
getPartitionSpecId());

// ByteBuddyUtils has trouble converting Map value type ByteBuffer
// to byte[] and back to ByteBuffer, so we perform this conversion manually
// here.
Map<Integer, ByteBuffer> lowerBounds = null;
Map<Integer, ByteBuffer> upperBounds = null;
if (getLowerBounds() != null) {
lowerBounds = new HashMap<>(getLowerBounds().size());
for (Map.Entry<Integer, byte[]> e : getLowerBounds().entrySet()) {
lowerBounds.put(e.getKey(), ByteBuffer.wrap(e.getValue()));
}
}
if (getUpperBounds() != null) {
upperBounds = new HashMap<>(getUpperBounds().size());
for (Map.Entry<Integer, byte[]> e : getUpperBounds().entrySet()) {
upperBounds.put(e.getKey(), ByteBuffer.wrap(e.getValue()));
}
}

Metrics dataFileMetrics =
new Metrics(
getRecordCount(),
getColumnSizes(),
getValueCounts(),
getNullValueCounts(),
getNanValueCounts(),
lowerBounds,
upperBounds);
toByteBufferMap(getLowerBounds()),
toByteBufferMap(getUpperBounds()));

return DataFiles.builder(partitionSpec)
.withFormat(FileFormat.fromString(getFileFormat()))
Expand All @@ -203,4 +168,31 @@ DataFile createDataFile(PartitionSpec partitionSpec) {
.withSplitOffsets(getSplitOffsets())
.build();
}

// ByteBuddyUtils has trouble converting Map value type ByteBuffer
// to byte[] and back to ByteBuffer, so we perform these conversions manually
// TODO(https://github.com/apache/beam/issues/32701)
private static @Nullable Map<Integer, byte[]> toByteArrayMap(
@Nullable Map<Integer, ByteBuffer> input) {
if (input == null) {
return null;
}
Map<Integer, byte[]> output = new HashMap<>(input.size());
for (Map.Entry<Integer, ByteBuffer> e : input.entrySet()) {
output.put(e.getKey(), e.getValue().array());
}
return output;
}

private static @Nullable Map<Integer, ByteBuffer> toByteBufferMap(
@Nullable Map<Integer, byte[]> input) {
if (input == null) {
return null;
}
Map<Integer, ByteBuffer> output = new HashMap<>(input.size());
for (Map.Entry<Integer, byte[]> e : input.entrySet()) {
output.put(e.getKey(), ByteBuffer.wrap(e.getValue()));
}
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class WriteToDestinations extends PTransform<PCollection<KV<String, Row>>, Icebe
this.dynamicDestinations = dynamicDestinations;
this.catalogConfig = catalogConfig;
this.triggeringFrequency = triggeringFrequency;
// single unique prefix per pipeline
// single unique prefix per write transform
this.filePrefix = UUID.randomUUID().toString();
}

Expand Down

0 comments on commit f443bd2

Please sign in to comment.