diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 84353316ed1af..5f13c6092e765 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -301,7 +301,7 @@ public static Map getIdentityPartitions(PartitionSpec p ImmutableMap.Builder columns = ImmutableMap.builder(); for (int i = 0; i < partitionSpec.fields().size(); i++) { PartitionField field = partitionSpec.fields().get(i); - if (field.transform().toString().equals("identity")) { + if (field.transform().isIdentity()) { columns.put(field, i); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java index 9f0d64a6b2261..ef3fb0203c6f1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java @@ -17,8 +17,10 @@ import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.FixedWidthType; +import com.facebook.presto.common.type.KllSketchType; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.NodeVersion; +import com.facebook.presto.iceberg.statistics.KllHistogram; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; @@ -31,6 +33,8 @@ import com.facebook.presto.spi.statistics.TableStatistics; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; import org.apache.datasketches.memory.Memory; import org.apache.datasketches.theta.CompactSketch; import org.apache.iceberg.ContentFile; @@ -88,10 +92,13 @@ import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions; import static com.facebook.presto.iceberg.Partition.toMap; import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize; +import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.HISTOGRAM; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.collect.Iterators.getOnlyElement; import static java.lang.Long.parseLong; import static java.lang.Math.abs; import static java.lang.String.format; @@ -104,6 +111,7 @@ public class TableStatisticsMaker private static final Logger log = Logger.get(TableStatisticsMaker.class); private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1"; private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1"; + private static final String ICEBERG_KLL_SKETCH_BLOB_TYPE_ID = "presto-kll-sketch-bytes-v1"; private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv"; private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size"; private final Table icebergTable; @@ -120,11 +128,13 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM private static final Map puffinStatWriters = ImmutableMap.builder() .put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob) .put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob) + .put(HISTOGRAM, TableStatisticsMaker::generateKllSketchBlob) .build(); private static final Map puffinStatReaders = ImmutableMap.builder() .put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob) .put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob) + .put(ICEBERG_KLL_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readKllSketchBlob) .build(); public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) @@ -297,7 +307,7 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta .forEach((key, value) -> { Optional.ofNullable(puffinStatWriters.get(key.getStatisticType())) .ifPresent(generator -> { - writer.add(generator.generate(key, value, icebergTable, snapshot)); + writer.add(generator.generate(key, value, icebergTable, snapshot, typeManager)); }); }); writer.finish(); @@ -323,7 +333,7 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta @FunctionalInterface private interface PuffinBlobGenerator { - Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot); + Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager); } @FunctionalInterface @@ -332,12 +342,12 @@ private interface PuffinBlobReader /** * Reads the stats from the blob and then updates the stats builder argument. */ - void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats); + void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats, Table icebergTable, TypeManager typeManager); } - private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager) { - int id = getFieldId(metadata, icebergTable); + int id = getField(metadata, icebergTable, snapshot).fieldId(); ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer(); CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder())); return new Blob( @@ -350,9 +360,9 @@ private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block valu ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate()))); } - private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager) { - int id = getFieldId(metadata, icebergTable); + int id = getField(metadata, icebergTable, snapshot).fieldId(); long size = BIGINT.getLong(value, 0); return new Blob( ICEBERG_DATA_SIZE_BLOB_TYPE_ID, @@ -364,7 +374,22 @@ private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size))); } - private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + private static Blob generateKllSketchBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager) + { + Types.NestedField field = getField(metadata, icebergTable, snapshot); + KllSketchType sketchType = new KllSketchType(toPrestoType(field.type(), typeManager)); + Slice sketchSlice = sketchType.getSlice(value, 0); + return new Blob( + ICEBERG_KLL_SKETCH_BLOB_TYPE_ID, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + sketchSlice.toByteBuffer(), + null, + ImmutableMap.of()); + } + + private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager) { Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)) .ifPresent(ndvProp -> { @@ -379,7 +404,7 @@ private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnSt }); } - private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager) { Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY)) .ifPresent(sizeProp -> { @@ -394,9 +419,17 @@ private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, Col }); } - private static int getFieldId(ColumnStatisticMetadata metadata, Table icebergTable) + private static void readKllSketchBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager) + { + statistics.setHistogram(Optional.ofNullable(icebergTable.schemas().get(icebergTable.snapshot(metadata.snapshotId()).schemaId())) + .map(schema -> toPrestoType(schema.findType(getOnlyElement(metadata.inputFields().iterator())), typeManager)) + .map(prestoType -> new KllHistogram(Slices.wrappedBuffer(blob), prestoType))); + } + + private static Types.NestedField getField(ColumnStatisticMetadata metadata, Table icebergTable, Snapshot snapshot) { - return Optional.ofNullable(icebergTable.schema().findField(metadata.getColumnName())).map(Types.NestedField::fieldId) + return Optional.ofNullable(icebergTable.schemas().get(snapshot.schemaId())) + .map(schema -> schema.findField(metadata.getColumnName())) .orElseThrow(() -> { log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()); return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name())); @@ -500,7 +533,7 @@ private Map loadStatisticsFile(StatisticsFile if (value == null) { value = ColumnStatistics.builder(); } - statReader.read(metadata, blob, value); + statReader.read(metadata, blob, value, icebergTable, typeManager); return value; }); }); @@ -524,6 +557,10 @@ public static List getSupportedColumnStatistics(String supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta")); } + if (isNumericType(type)) { + supportedStatistics.add(HISTOGRAM.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_kll")); + } + if (!(type instanceof FixedWidthType)) { supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName)); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/KllHistogram.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/KllHistogram.java new file mode 100644 index 0000000000000..8fc910c442d2c --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/KllHistogram.java @@ -0,0 +1,161 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.statistics; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeUtils; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.statistics.ConnectorHistogram; +import com.facebook.presto.spi.statistics.Estimate; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.datasketches.common.ArrayOfBooleansSerDe; +import org.apache.datasketches.common.ArrayOfDoublesSerDe; +import org.apache.datasketches.common.ArrayOfItemsSerDe; +import org.apache.datasketches.common.ArrayOfLongsSerDe; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.kll.KllItemsSketch; +import org.apache.datasketches.memory.Memory; + +import java.util.Comparator; +import java.util.function.Function; + +import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Verify.verify; +import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static java.util.Objects.requireNonNull; +import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.EXCLUSIVE; +import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE; + +public class KllHistogram + implements ConnectorHistogram +{ + private final KllItemsSketch dataSketch; + private final Type type; + private final Function toDouble; + private final Function fromDouble; + + @JsonCreator + public KllHistogram(@JsonProperty("sketch") Slice bytes, @JsonProperty("type") Type type) + { + verify(TypeUtils.isNumericType(type), "only numeric types support histograms"); + this.type = requireNonNull(type, "type is null"); + SketchParameters parameters = getSketchParameters(type); + // the actual sketch can only accept the same object types which generated it + // however, the API can only accept or generate double types. We cast the inputs + // and results to/from double to satisfy the underlying sketch type. + if (type.getJavaType().isAssignableFrom(double.class)) { + toDouble = x -> (double) x; + fromDouble = x -> x; + } + else if (type.getJavaType().isAssignableFrom(long.class)) { + toDouble = x -> (double) (long) x; + fromDouble = x -> (long) (double) x; + } + else if (type.getJavaType().isAssignableFrom(int.class)) { + toDouble = x -> (double) (int) x; + fromDouble = x -> (int) (double) x; + } + else { + throw new PrestoException(INVALID_ARGUMENTS, "can't create kll sketch from type: " + type); + } + dataSketch = KllItemsSketch.wrap(Memory.wrap(bytes.toByteBuffer(), LITTLE_ENDIAN), parameters.getComparator(), parameters.getSerde()); + } + + @JsonProperty + public Slice getDataSketch() + { + return Slices.wrappedBuffer(dataSketch.toByteArray()); + } + + @JsonProperty + public Type getType() + { + return type; + } + + @Override + public Estimate cumulativeProbability(double value, boolean inclusive) + { + return Estimate.of(dataSketch.getRank(fromDouble.apply(value), inclusive ? INCLUSIVE : EXCLUSIVE)); + } + + @Override + public Estimate inverseCumulativeProbability(double percentile) + { + return Estimate.of(toDouble.apply(dataSketch.getQuantile(percentile))); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("type", type) + .add("k", this.dataSketch.getK()) + .add("N", this.dataSketch.getN()) + .add("retained", this.dataSketch.getNumRetained()) + .add("min", this.dataSketch.getMinItem()) + .add("max", this.dataSketch.getMaxItem()) + .add("p50", dataSketch.getQuantile(0.5)) + .add("p75", dataSketch.getQuantile(0.75)) + .add("p90", dataSketch.getQuantile(0.90)) + .add("p99", dataSketch.getQuantile(0.99)) + .add("p99.9", dataSketch.getQuantile(0.999)) + .toString(); + } + + private static class SketchParameters + { + private final Comparator comparator; + private final ArrayOfItemsSerDe serde; + + public SketchParameters(Comparator comparator, ArrayOfItemsSerDe serde) + { + this.comparator = comparator; + this.serde = serde; + } + + public Comparator getComparator() + { + return comparator; + } + + public ArrayOfItemsSerDe getSerde() + { + return serde; + } + } + + private static SketchParameters getSketchParameters(Type type) + { + if (type.getJavaType().equals(double.class)) { + return new SketchParameters<>(Double::compareTo, new ArrayOfDoublesSerDe()); + } + else if (type.getJavaType().equals(long.class)) { + return new SketchParameters<>(Long::compareTo, new ArrayOfLongsSerDe()); + } + else if (type.getJavaType().equals(Slice.class)) { + return new SketchParameters<>(String::compareTo, new ArrayOfStringsSerDe()); + } + else if (type.getJavaType().equals(boolean.class)) { + return new SketchParameters<>(Boolean::compareTo, new ArrayOfBooleansSerDe()); + } + else { + throw new PrestoException(INVALID_ARGUMENTS, "failed to deserialize KLL Sketch. No suitable type found for " + type); + } + } +}