Skip to content

Commit

Permalink
Add Kll histogram support to Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Mar 29, 2024
1 parent 79251b5 commit 58cfe30
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
ImmutableMap.Builder<PartitionField, Integer> columns = ImmutableMap.builder();
for (int i = 0; i < partitionSpec.fields().size(); i++) {
PartitionField field = partitionSpec.fields().get(i);
if (field.transform().toString().equals("identity")) {
if (field.transform().isIdentity()) {
columns.put(field, i);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.KllSketchType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.statistics.KllHistogram;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -31,6 +33,8 @@
import com.facebook.presto.spi.statistics.TableStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.iceberg.ContentFile;
Expand Down Expand Up @@ -88,10 +92,12 @@
import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions;
import static com.facebook.presto.iceberg.Partition.toMap;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.HISTOGRAM;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Iterators.getOnlyElement;
import static java.lang.Long.parseLong;
import static java.lang.Math.abs;
import static java.lang.String.format;
Expand All @@ -104,6 +110,7 @@ public class TableStatisticsMaker
private static final Logger log = Logger.get(TableStatisticsMaker.class);
private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1";
private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1";
private static final String ICEBERG_KLL_SKETCH_BLOB_TYPE_ID = "presto-kll-sketch-bytes-v1";
private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv";
private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size";
private final Table icebergTable;
Expand All @@ -120,11 +127,13 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM
private static final Map<ColumnStatisticType, PuffinBlobGenerator> puffinStatWriters = ImmutableMap.<ColumnStatisticType, PuffinBlobGenerator>builder()
.put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob)
.put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob)
.put(HISTOGRAM, TableStatisticsMaker::generateKllSketchBlob)
.build();

private static final Map<String, PuffinBlobReader> puffinStatReaders = ImmutableMap.<String, PuffinBlobReader>builder()
.put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob)
.put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob)
.put(ICEBERG_KLL_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readKllSketchBlob)
.build();

public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
Expand Down Expand Up @@ -298,7 +307,7 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta
.forEach((key, value) -> {
Optional.ofNullable(puffinStatWriters.get(key.getStatisticType()))
.ifPresent(generator -> {
writer.add(generator.generate(key, value, icebergTable, snapshot));
writer.add(generator.generate(key, value, icebergTable, snapshot, typeManager));
});
});
writer.finish();
Expand All @@ -324,7 +333,7 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta
@FunctionalInterface
private interface PuffinBlobGenerator
{
Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot);
Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager);
}

@FunctionalInterface
Expand All @@ -333,12 +342,12 @@ private interface PuffinBlobReader
/**
* Reads the stats from the blob and then updates the stats builder argument.
*/
void read(BlobMetadata metadata, ByteBuffer blop, ColumnStatistics.Builder stats);
void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats, Table icebergTable, TypeManager typeManager);
}

private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot)
private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
int id = getFieldId(metadata, icebergTable);
int id = getField(metadata, icebergTable, snapshot).fieldId();
ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer();
CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder()));
return new Blob(
Expand All @@ -351,9 +360,9 @@ private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block valu
ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate())));
}

private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot)
private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
int id = getFieldId(metadata, icebergTable);
int id = getField(metadata, icebergTable, snapshot).fieldId();
long size = BIGINT.getLong(value, 0);
return new Blob(
ICEBERG_DATA_SIZE_BLOB_TYPE_ID,
Expand All @@ -365,7 +374,22 @@ private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block
ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size)));
}

private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics)
private static Blob generateKllSketchBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
Types.NestedField field = getField(metadata, icebergTable, snapshot);
KllSketchType sketchType = new KllSketchType(toPrestoType(field.type(), typeManager));
Slice sketchSlice = sketchType.getSlice(value, 0);
return new Blob(
ICEBERG_KLL_SKETCH_BLOB_TYPE_ID,
ImmutableList.of(field.fieldId()),
snapshot.snapshotId(),
snapshot.sequenceNumber(),
sketchSlice.toByteBuffer(),
null,
ImmutableMap.of());
}

private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY))
.ifPresent(ndvProp -> {
Expand All @@ -380,7 +404,7 @@ private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnSt
});
}

private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics)
private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY))
.ifPresent(sizeProp -> {
Expand All @@ -395,9 +419,17 @@ private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, Col
});
}

private static int getFieldId(ColumnStatisticMetadata metadata, Table icebergTable)
private static void readKllSketchBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
statistics.setHistogram(Optional.ofNullable(icebergTable.schemas().get(icebergTable.snapshot(metadata.snapshotId()).schemaId()))
.map(schema -> toPrestoType(schema.findType(getOnlyElement(metadata.inputFields().iterator())), typeManager))
.map(prestoType -> new KllHistogram(Slices.wrappedBuffer(blob), prestoType)));
}

private static Types.NestedField getField(ColumnStatisticMetadata metadata, Table icebergTable, Snapshot snapshot)
{
return Optional.ofNullable(icebergTable.schema().findField(metadata.getColumnName())).map(Types.NestedField::fieldId)
return Optional.ofNullable(icebergTable.schemas().get(snapshot.schemaId()))
.map(schema -> schema.findField(metadata.getColumnName()))
.orElseThrow(() -> {
log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name());
return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()));
Expand Down Expand Up @@ -524,7 +556,7 @@ private Map<Integer, ColumnStatistics.Builder> loadStatisticsFile(StatisticsFile
if (value == null) {
value = ColumnStatistics.builder();
}
statReader.read(metadata, blob, value);
statReader.read(metadata, blob, value, icebergTable, typeManager);
return value;
});
});
Expand All @@ -548,6 +580,10 @@ public static List<ColumnStatisticMetadata> getSupportedColumnStatistics(String
supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta"));
}

if (isNumericType(type)) {
supportedStatistics.add(HISTOGRAM.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_kll"));
}

if (!(type instanceof FixedWidthType)) {
supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.statistics;

import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeUtils;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.statistics.ConnectorHistogram;
import com.facebook.presto.spi.statistics.Estimate;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.datasketches.common.ArrayOfBooleansSerDe;
import org.apache.datasketches.common.ArrayOfDoublesSerDe;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.ArrayOfLongsSerDe;
import org.apache.datasketches.common.ArrayOfStringsSerDe;
import org.apache.datasketches.kll.KllItemsSketch;
import org.apache.datasketches.memory.Memory;

import java.util.Comparator;
import java.util.function.Function;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Objects.requireNonNull;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.EXCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;

public class KllHistogram
implements ConnectorHistogram
{
private final KllItemsSketch dataSketch;
private final Type type;
private final Function<Object, Double> toDouble;
private final Function<Double, Object> fromDouble;

@JsonCreator
public KllHistogram(@JsonProperty("sketch") Slice bytes, @JsonProperty("type") Type type)
{
verify(TypeUtils.isNumericType(type), "only numeric types support histograms");
this.type = requireNonNull(type, "type is null");
SketchParameters parameters = getSketchParameters(type);
// the actual sketch can only accept the same object types which generated it
// however, the API can only accept or generate double types. We cast the inputs
// and results to/from double to satisfy the underlying sketch type.
if (type.getJavaType().isAssignableFrom(double.class)) {
toDouble = x -> (double) x;
fromDouble = x -> x;
}
else if (type.getJavaType().isAssignableFrom(long.class)) {
toDouble = x -> (double) (long) x;
fromDouble = x -> (long) (double) x;
}
else if (type.getJavaType().isAssignableFrom(int.class)) {
toDouble = x -> (double) (int) x;
fromDouble = x -> (int) (double) x;
}
else {
throw new PrestoException(INVALID_ARGUMENTS, "can't create kll sketch from type: " + type);
}
dataSketch = KllItemsSketch.wrap(Memory.wrap(bytes.toByteBuffer(), LITTLE_ENDIAN), parameters.getComparator(), parameters.getSerde());
}

@JsonProperty
public Slice getDataSketch()
{
return Slices.wrappedBuffer(dataSketch.toByteArray());
}

@JsonProperty
public Type getType()
{
return type;
}

@Override
public Estimate cumulativeProbability(double value, boolean inclusive)
{
return Estimate.of(dataSketch.getRank(fromDouble.apply(value), inclusive ? INCLUSIVE : EXCLUSIVE));
}

@Override
public Estimate inverseCumulativeProbability(double percentile)
{
return Estimate.of(toDouble.apply(dataSketch.getQuantile(percentile)));
}

@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.add("k", this.dataSketch.getK())
.add("N", this.dataSketch.getN())
.add("retained", this.dataSketch.getNumRetained())
.add("min", this.dataSketch.getMinItem())
.add("max", this.dataSketch.getMaxItem())
.add("p50", dataSketch.getQuantile(0.5))
.add("p75", dataSketch.getQuantile(0.75))
.add("p90", dataSketch.getQuantile(0.90))
.add("p99", dataSketch.getQuantile(0.99))
.add("p99.9", dataSketch.getQuantile(0.999))
.toString();
}

private static class SketchParameters<T>
{
private final Comparator<T> comparator;
private final ArrayOfItemsSerDe<T> serde;

public SketchParameters(Comparator<T> comparator, ArrayOfItemsSerDe<T> serde)
{
this.comparator = comparator;
this.serde = serde;
}

public Comparator<T> getComparator()
{
return comparator;
}

public ArrayOfItemsSerDe<T> getSerde()
{
return serde;
}
}

private static SketchParameters<?> getSketchParameters(Type type)
{
if (type.getJavaType().equals(double.class)) {
return new SketchParameters<>(Double::compareTo, new ArrayOfDoublesSerDe());
}
else if (type.getJavaType().equals(long.class)) {
return new SketchParameters<>(Long::compareTo, new ArrayOfLongsSerDe());
}
else if (type.getJavaType().equals(Slice.class)) {
return new SketchParameters<>(String::compareTo, new ArrayOfStringsSerDe());
}
else if (type.getJavaType().equals(boolean.class)) {
return new SketchParameters<>(Boolean::compareTo, new ArrayOfBooleansSerDe());
}
else {
throw new PrestoException(INVALID_ARGUMENTS, "failed to deserialize KLL Sketch. No suitable type found for " + type);
}
}
}

0 comments on commit 58cfe30

Please sign in to comment.