diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index ec640a5f6ab84..41a8e6d9950e0 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -225,10 +225,13 @@ Property Name Description ``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true`` -``iceberg.hive-statistics-merge-strategy`` Determines how to merge statistics that are stored in the ``NONE`` - Hive Metastore. The available values are ``NONE``, - ``USE_NULLS_FRACTION_AND_NDV``, ``USE_NULLS_FRACTIONS`` - and, ``USE_NDV`` +``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the + Hive Metastore to override Iceberg table statistics. + The available values are ``NUMBER_OF_DISTINCT_VALUES``, + ``NUMBER_OF_NON_NULL_VALUES``, ``TOTAL_SIZE_IN_BYTES``. + + **Note**: Only valid when the Iceberg connector is + configured with Hive. ``iceberg.statistic-snapshot-record-difference-weight`` The amount that the difference in total record count matters when calculating the closest snapshot when picking @@ -306,6 +309,8 @@ Property Name Description ============================================= ====================================================================== ``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property ``iceberg.delete-as-join-rewrite-enabled`` in the current session. +``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property + ``iceberg.hive-statistics-merge-strategy`` in the current session. ============================================= ====================================================================== Caching Support @@ -1172,7 +1177,7 @@ each Iceberg data type to the corresponding Presto data type, and from each Pres The following tables detail the specific type maps between PrestoDB and Iceberg. Iceberg to PrestoDB type mapping -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Map of Iceberg types to the relevant PrestoDB types: @@ -1215,7 +1220,7 @@ Map of Iceberg types to the relevant PrestoDB types: No other types are supported. PrestoDB to Iceberg type mapping -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Map of PrestoDB types to the relevant Iceberg types: diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 8bf6686c9ec70..78573bfe43374 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -16,7 +16,7 @@ import com.facebook.airlift.configuration.Config; import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.presto.hive.HiveCompressionCodec; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import org.apache.iceberg.hadoop.HadoopFileIO; @@ -26,7 +26,11 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.Arrays; +import java.util.EnumSet; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import static com.facebook.presto.hive.HiveCompressionCodec.GZIP; import static com.facebook.presto.iceberg.CatalogType.HIVE; @@ -51,12 +55,13 @@ public class IcebergConfig private boolean pushdownFilterEnabled; private boolean deleteAsJoinRewriteEnabled = true; - private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE; + private EnumSet hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class); private String fileIOImpl = HadoopFileIO.class.getName(); private boolean manifestCachingEnabled; private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; + @NotNull public FileFormat getFileFormat() { @@ -195,16 +200,22 @@ public boolean isMergeOnReadModeEnabled() } @Config("iceberg.hive-statistics-merge-strategy") - @ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore") - public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy) - { - this.hiveStatisticsMergeStrategy = mergeStrategy; + @ConfigDescription("Comma separated list of statistics to use from the Hive metastore to override iceberg table statistics") + public IcebergConfig setHiveStatisticsMergeFlags(String mergeFlags) + { + this.hiveStatisticsMergeFlags = Optional.of(Arrays.stream(mergeFlags.trim().split(",")) + .filter(value -> !value.isEmpty()) + .map(ColumnStatisticType::valueOf) + .collect(Collectors.toSet())) + .filter(set -> !set.isEmpty()) + .map(EnumSet::copyOf) + .orElse(EnumSet.noneOf(ColumnStatisticType.class)); return this; } - public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy() + public EnumSet getHiveStatisticsMergeFlags() { - return hiveStatisticsMergeStrategy; + return hiveStatisticsMergeFlags; } @Config("iceberg.statistic-snapshot-record-difference-weight") diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 113ccc931da7e..76c1debc31821 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -34,7 +34,6 @@ import com.facebook.presto.hive.metastore.PrestoTableType; import com.facebook.presto.hive.metastore.PrincipalPrivileges; import com.facebook.presto.hive.metastore.Table; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; @@ -57,6 +56,7 @@ import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.security.PrestoPrincipal; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ComputedStatistics; import com.facebook.presto.spi.statistics.Estimate; @@ -82,6 +82,7 @@ import java.io.IOException; import java.time.ZoneId; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -442,7 +443,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab IcebergTableHandle handle = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); - HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session); + EnumSet mergeFlags = getHiveStatisticsMergeStrategy(session); return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> { TupleDomain domainPredicate = layoutHandle.getDomainPredicate() .transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null) @@ -455,7 +456,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab }); RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate); PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); - TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec()); + TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec()); TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder() .setRowCount(mergedStatistics.getRowCount()); double totalSize = 0; @@ -481,11 +482,8 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab IcebergColumnHandle::getName, IcebergColumnHandle::getType))); }).orElseGet(() -> { - if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) { - PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); - return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec()); - } - return icebergStatistics; + PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName()); + return mergeHiveStatistics(icebergStatistics, hiveStats, mergeFlags, icebergTable.spec()); }); } @@ -500,9 +498,16 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession { Set columnStatistics = tableMetadata.getColumns().stream() .filter(column -> !column.isHidden()) - .flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) - .stream() - .map(statType -> statType.getColumnStatisticMetadata(meta.getName()))) + .flatMap(meta -> { + try { + return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) + .stream() + .map(statType -> statType.getColumnStatisticMetadata(meta.getName())); + } + catch (IllegalArgumentException e) { + return ImmutableSet.of().stream(); + } + }) .collect(toImmutableSet()); Set tableStatistics = ImmutableSet.of(ROW_COUNT); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index fe4c51aa75492..0a39ca0bd3667 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -18,9 +18,9 @@ import com.facebook.presto.hive.OrcFileWriterConfig; import com.facebook.presto.hive.ParquetFileWriterConfig; import com.facebook.presto.iceberg.nessie.NessieConfig; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.session.PropertyMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; @@ -28,7 +28,10 @@ import javax.inject.Inject; +import java.util.Arrays; +import java.util.EnumSet; import java.util.List; +import java.util.stream.Collectors; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; @@ -36,6 +39,9 @@ import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty; import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; public final class IcebergSessionProperties { @@ -156,14 +162,15 @@ public IcebergSessionProperties( false), new PropertyMetadata<>( HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, - "choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: " - + Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()), + "Flags to choose which statistics from the Hive Metastore are used when calculating table stats. Valid values are: " + + Joiner.on(", ").join(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES), VARCHAR, - HiveStatisticsMergeStrategy.class, - icebergConfig.getHiveStatisticsMergeStrategy(), + EnumSet.class, + icebergConfig.getHiveStatisticsMergeFlags(), false, - val -> HiveStatisticsMergeStrategy.valueOf((String) val), - HiveStatisticsMergeStrategy::name), + val -> EnumSet.copyOf(Arrays.stream(((String) val).trim().split(",")).map(ColumnStatisticType::valueOf) + .collect(Collectors.toSet())), + set -> Joiner.on(",").join(((EnumSet) set).stream().map(Enum::name).iterator())), booleanProperty( PUSHDOWN_FILTER_ENABLED, "Experimental: Enable Filter Pushdown for Iceberg. This is only supported with Native Worker.", @@ -272,9 +279,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session) return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class); } - public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session) + public static EnumSet getHiveStatisticsMergeStrategy(ConnectorSession session) { - return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class); + return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, EnumSet.class); } public static boolean isPushdownFilterEnabled(ConnectorSession session) diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java index a7c26a6694456..29343c63e0b73 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java @@ -14,6 +14,7 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.FixedWidthType; import com.facebook.presto.common.type.TypeManager; @@ -22,6 +23,7 @@ import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ComputedStatistics; import com.facebook.presto.spi.statistics.DoubleRange; @@ -44,13 +46,17 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; import org.apache.iceberg.puffin.PuffinWriter; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import java.io.IOException; import java.io.UncheckedIOException; @@ -68,6 +74,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.DateType.DATE; import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; @@ -82,6 +89,7 @@ import static com.facebook.presto.iceberg.Partition.toMap; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Long.parseLong; @@ -95,7 +103,9 @@ public class TableStatisticsMaker { private static final Logger log = Logger.get(TableStatisticsMaker.class); private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1"; + private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1"; private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv"; + private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size"; private final Table icebergTable; private final ConnectorSession session; private final TypeManager typeManager; @@ -107,6 +117,16 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM this.typeManager = typeManager; } + private static final Map puffinStatWriters = ImmutableMap.builder() + .put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob) + .put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob) + .build(); + + private static final Map puffinStatReaders = ImmutableMap.builder() + .put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob) + .put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob) + .build(); + public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) { return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(tableHandle, constraint, columns); @@ -164,7 +184,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons result.setRowCount(Estimate.of(recordCount)); result.setTotalSize(Estimate.of(summary.getSize())); Map tableStats = getClosestStatisticsFileForSnapshot(tableHandle) - .map(TableStatisticsMaker::loadStatisticsFile).orElseGet(Collections::emptyMap); + .map(this::loadStatisticsFile).orElseGet(Collections::emptyMap); for (IcebergColumnHandle columnHandle : selectedColumns) { int fieldId = columnHandle.getId(); ColumnStatistics.Builder columnBuilder = tableStats.getOrDefault(fieldId, ColumnStatistics.builder()); @@ -172,12 +192,6 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons if (nullCount != null) { columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount)); } - if (summary.getColumnSizes() != null) { - Long columnSize = summary.getColumnSizes().get(fieldId); - if (columnSize != null) { - columnBuilder.setDataSize(Estimate.of(columnSize)); - } - } Object min = summary.getMinValues().get(fieldId); Object max = summary.getMaxValues().get(fieldId); if (min instanceof Number && max instanceof Number) { @@ -282,24 +296,10 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) .forEach((key, value) -> { - if (!key.getStatisticType().equals(NUMBER_OF_DISTINCT_VALUES)) { - return; - } - Optional id = Optional.ofNullable(icebergTable.schema().findField(key.getColumnName())).map(Types.NestedField::fieldId); - if (!id.isPresent()) { - log.warn("failed to find column name %s in schema of table %s when writing distinct value statistics", key.getColumnName(), icebergTable.name()); - throw new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s when writing distinct value statistics", key.getColumnName(), icebergTable.name())); - } - ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer(); - CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder())); - writer.add(new Blob( - ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, - ImmutableList.of(id.get()), - snapshot.snapshotId(), - snapshot.sequenceNumber(), - raw, - null, - ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate())))); + Optional.ofNullable(puffinStatWriters.get(key.getStatisticType())) + .ifPresent(generator -> { + writer.add(generator.generate(key, value, icebergTable, snapshot)); + }); }); writer.finish(); icebergTable.updateStatistics().setStatistics( @@ -321,6 +321,89 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta } } + @FunctionalInterface + private interface PuffinBlobGenerator + { + Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot); + } + + @FunctionalInterface + private interface PuffinBlobReader + { + /** + * Reads the stats from the blob and then updates the stats builder argument. + */ + void read(BlobMetadata metadata, ByteBuffer blop, ColumnStatistics.Builder stats); + } + + private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + { + int id = getFieldId(metadata, icebergTable); + ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer(); + CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder())); + return new Blob( + ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, + ImmutableList.of(id), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + raw, + null, + ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate()))); + } + + private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot) + { + int id = getFieldId(metadata, icebergTable); + long size = BIGINT.getLong(value, 0); + return new Blob( + ICEBERG_DATA_SIZE_BLOB_TYPE_ID, + ImmutableList.of(id), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + ByteBuffer.allocate(0), // empty bytebuffer since the value is just stored on the blob properties + null, + ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size))); + } + + private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + { + Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)) + .ifPresent(ndvProp -> { + try { + long ndv = parseLong(ndvProp); + statistics.setDistinctValuesCount(Estimate.of(ndv)); + } + catch (NumberFormatException e) { + statistics.setDistinctValuesCount(Estimate.unknown()); + log.warn("bad long value when parsing NDVs for statistics file blob %s. bad value: %d", metadata.type(), ndvProp); + } + }); + } + + private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics) + { + Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY)) + .ifPresent(sizeProp -> { + try { + long size = parseLong(sizeProp); + statistics.setDataSize(Estimate.of(size)); + } + catch (NumberFormatException e) { + statistics.setDataSize(Estimate.unknown()); + log.warn("bad long value when parsing data size from statistics file blob %s. bad value: %d", metadata.type(), sizeProp); + } + }); + } + + private static int getFieldId(ColumnStatisticMetadata metadata, Table icebergTable) + { + return Optional.ofNullable(icebergTable.schema().findField(metadata.getColumnName())).map(Types.NestedField::fieldId) + .orElseThrow(() -> { + log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()); + return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name())); + }); + } + public void updateColumnSizes(Partition summary, Map addedColumnSizes) { Map columnSizes = summary.getColumnSizes(); @@ -424,29 +507,35 @@ private Optional getClosestStatisticsFileForSnapshot(IcebergTabl /** * Builds a map of field ID to ColumnStatistics for a particular {@link StatisticsFile}. - * - * @return */ - private static Map loadStatisticsFile(StatisticsFile file) + private Map loadStatisticsFile(StatisticsFile file) { - ImmutableMap.Builder result = ImmutableMap.builder(); - file.blobMetadata().forEach(blob -> { - Integer field = getOnlyElement(blob.fields()); - ColumnStatistics.Builder colStats = ColumnStatistics.builder(); - Optional.ofNullable(blob.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY)) - .ifPresent(ndvProp -> { - try { - long ndv = parseLong(ndvProp); - colStats.setDistinctValuesCount(Estimate.of(ndv)); - } - catch (NumberFormatException e) { - colStats.setDistinctValuesCount(Estimate.unknown()); - log.warn("bad long value when parsing statistics file %s, bad value: %d", file.path(), ndvProp); - } - }); - result.put(field, colStats); - }); - return result.build(); + Map result = new HashMap<>(); + try (FileIO io = icebergTable.io()) { + InputFile inputFile = io.newInputFile(file.path()); + try (PuffinReader reader = Puffin.read(inputFile).build()) { + for (Pair data : reader.readAll(reader.fileMetadata().blobs())) { + BlobMetadata metadata = data.first(); + ByteBuffer blob = data.second(); + Integer field = getOnlyElement(metadata.inputFields()); + Optional.ofNullable(puffinStatReaders.get(metadata.type())) + .ifPresent(statReader -> { + result.compute(field, (key, value) -> { + if (value == null) { + value = ColumnStatistics.builder(); + } + statReader.read(metadata, blob, value); + return value; + }); + }); + } + } + catch (IOException e) { + throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "failed to read statistics file at " + file.path(), e); + } + } + + return ImmutableMap.copyOf(result); } public static List getSupportedColumnStatistics(String columnName, com.facebook.presto.common.type.Type type) @@ -459,6 +548,10 @@ public static List getSupportedColumnStatistics(String supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta")); } + if (!(type instanceof FixedWidthType)) { + supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName)); + } + return supportedStatistics.build(); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java deleted file mode 100644 index 419a04cad263f..0000000000000 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.iceberg.util; - -/** - * strategies that define how to merge hive column statistics into Iceberg column statistics. - */ -public enum HiveStatisticsMergeStrategy -{ - /** - * Do not merge statistics from Hive - */ - NONE, - /** - * Only merge NDV statistics from hive - */ - USE_NDV, - /** - * Only merge null fractions from hive - */ - USE_NULLS_FRACTIONS, - /** - * Merge both null fractions and NDVs from Hive - */ - USE_NULLS_FRACTION_AND_NDV, -} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java index c64b1218fdfd7..1695925e04fa9 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java @@ -16,17 +16,17 @@ import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.Estimate; import com.facebook.presto.spi.statistics.TableStatistics; import org.apache.iceberg.PartitionSpec; +import java.util.EnumSet; import java.util.Map; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; public final class StatisticsUtil { @@ -34,9 +34,16 @@ private StatisticsUtil() { } - public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatistics, PartitionStatistics hiveStatistics, HiveStatisticsMergeStrategy mergeStrategy, PartitionSpec spec) + /** + * Attempts to merge statistics from Iceberg and hive tables. + *
+ * If a statistic is unknown on the Iceberg table, but known in Hive, the Hive statistic + * will always be used. Otherwise, hive statistics are only used if specified in the + * hive-statistic-merge-strategy + */ + public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatistics, PartitionStatistics hiveStatistics, EnumSet mergeFlags, PartitionSpec spec) { - if (mergeStrategy.equals(NONE) || spec.isPartitioned()) { + if (spec.isPartitioned()) { return icebergStatistics; } // We really only need to merge in NDVs and null fractions from the column statistics in hive's stats @@ -57,10 +64,12 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) .setRange(icebergColumnStats.getRange()); if (hiveColumnStats != null) { - if (mergeStrategy.equals(USE_NDV) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { + // NDVs + if (mergeFlags.contains(NUMBER_OF_DISTINCT_VALUES)) { hiveColumnStats.getDistinctValuesCount().ifPresent(ndvs -> mergedStats.setDistinctValuesCount(Estimate.of(ndvs))); } - if (mergeStrategy.equals(USE_NULLS_FRACTIONS) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { + // Null count + if (mergeFlags.contains(NUMBER_OF_NON_NULL_VALUES)) { hiveColumnStats.getNullsCount().ifPresent(nullCount -> { Estimate nullsFraction; if (!hiveStatistics.getBasicStatistics().getRowCount().isPresent()) { @@ -77,6 +86,10 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist mergedStats.setNullsFraction(nullsFraction); }); } + // data size + if (mergeFlags.contains(ColumnStatisticType.TOTAL_SIZE_IN_BYTES)) { + hiveColumnStats.getTotalSizeInBytes().ifPresent(size -> mergedStats.setDataSize(Estimate.of(size))); + } } statsBuilder.setColumnStatistics(columnHandle, mergedStats.build()); }); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index f808f3c59d6a7..3cac3903b5ee0 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -31,6 +31,7 @@ import com.facebook.presto.hive.authentication.NoHdfsAuthentication; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.TableHandle; @@ -633,49 +634,92 @@ public void testStringFilters() } @Test - public void testReadWriteNDVs() + public void testReadWriteStats() { - assertUpdate("CREATE TABLE test_stat_ndv (col0 int)"); - assertTrue(getQueryRunner().tableExists(getSession(), "test_stat_ndv")); - assertTableColumnNames("test_stat_ndv", "col0"); + assertUpdate("CREATE TABLE test_stats (col0 int, col1 varchar)"); + assertTrue(getQueryRunner().tableExists(getSession(), "test_stats")); + assertTableColumnNames("test_stats", "col0", "col1"); // test that stats don't exist before analyze - TableStatistics stats = getTableStats("test_stat_ndv"); - assertTrue(stats.getColumnStatistics().isEmpty()); + Function, Map> remapper = (input) -> input.entrySet().stream().collect(Collectors.toMap(e -> ((IcebergColumnHandle) e.getKey()).getName(), Map.Entry::getValue)); + Map columnStats; + TableStatistics stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + assertTrue(columnStats.isEmpty()); // test after simple insert we get a good estimate - assertUpdate("INSERT INTO test_stat_ndv VALUES 1, 2, 3", 3); - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3); + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + ColumnStatistics columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + double dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test after inserting the same values, we still get the same estimate - assertUpdate("INSERT INTO test_stat_ndv VALUES 1, 2, 3", 3); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); - - // test after ANALYZING with the new inserts that the NDV estimate is the same - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize().getValue(), dataSize); + + // test after ANALYZING with the new inserts that the NDV estimate is the same and the data size matches + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test after inserting a new value, but not analyzing, the estimate is the same. - assertUpdate("INSERT INTO test_stat_ndv VALUES 4", 1); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(3.0)); + assertUpdate("INSERT INTO test_stats VALUES (4, 'def')", 1); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0)); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test that after analyzing, the updates stats show up. - getQueryRunner().execute("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(4.0)); + getQueryRunner().execute("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); // test adding a null value is successful, and analyze still runs successfully - assertUpdate("INSERT INTO test_stat_ndv VALUES NULL", 1); - assertQuerySucceeds("ANALYZE test_stat_ndv"); - stats = getTableStats("test_stat_ndv"); - assertEquals(stats.getColumnStatistics().values().stream().findFirst().get().getDistinctValuesCount(), Estimate.of(4.0)); - - assertUpdate("DROP TABLE test_stat_ndv"); + assertUpdate("INSERT INTO test_stats VALUES (NULL, NULL)", 1); + assertQuerySucceeds("ANALYZE test_stats"); + stats = getTableStats("test_stats"); + columnStats = remapper.apply(stats.getColumnStatistics()); + columnStat = columnStats.get("col0"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + assertEquals(columnStat.getDataSize(), Estimate.unknown()); + columnStat = columnStats.get("col1"); + assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0)); + dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col1) FROM test_stats").getOnlyValue(); + assertEquals(columnStat.getDataSize().getValue(), dataSize); + + assertUpdate("DROP TABLE test_stats"); } @Test @@ -817,6 +861,7 @@ public void testStatsDataSizePrimitives() { assertUpdate("CREATE TABLE test_stat_data_size (c0 int, c1 bigint, c2 double, c3 decimal(4, 0), c4 varchar, c5 varchar(10), c6 date, c7 time, c8 timestamp, c10 boolean)"); assertUpdate("INSERT INTO test_stat_data_size VALUES (0, 1, 2.0, CAST(4.01 as decimal(4, 0)), 'testvc', 'testvc10', date '2024-03-14', localtime, localtimestamp, TRUE)", 1); + assertQuerySucceeds("ANALYZE test_stat_data_size"); TableStatistics stats = getTableStats("test_stat_data_size"); stats.getColumnStatistics().entrySet().stream() .filter((e) -> ((IcebergColumnHandle) e.getKey()).getColumnType() != SYNTHESIZED) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 7d84025f72ba5..90a4e3e4a6dff 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.iceberg; -import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.hadoop.HadoopFileIO; import org.testng.annotations.Test; @@ -29,7 +28,8 @@ import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.ORC; import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT; import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT; @@ -46,7 +46,7 @@ public void testDefaults() .setCatalogWarehouse(null) .setCatalogCacheSize(10) .setHadoopConfigResources(null) - .setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy.NONE) + .setHiveStatisticsMergeFlags("") .setStatisticSnapshotRecordDifferenceWeight(0.0) .setMaxPartitionsPerWriter(100) .setMinimumAssignedSplitWeight(0.05) @@ -76,7 +76,7 @@ public void testExplicitPropertyMappings() .put("iceberg.enable-parquet-dereference-pushdown", "false") .put("iceberg.enable-merge-on-read-mode", "false") .put("iceberg.statistic-snapshot-record-difference-weight", "1.0") - .put("iceberg.hive-statistics-merge-strategy", "USE_NDV") + .put("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name() + "," + NUMBER_OF_NON_NULL_VALUES.name()) .put("iceberg.pushdown-filter-enabled", "true") .put("iceberg.delete-as-join-rewrite-enabled", "false") .put("iceberg.io.manifest.cache-enabled", "true") @@ -98,7 +98,7 @@ public void testExplicitPropertyMappings() .setStatisticSnapshotRecordDifferenceWeight(1.0) .setParquetDereferencePushdownEnabled(false) .setMergeOnReadModeEnabled(false) - .setHiveStatisticsMergeStrategy(USE_NDV) + .setHiveStatisticsMergeFlags("NUMBER_OF_DISTINCT_VALUES,NUMBER_OF_NON_NULL_VALUES") .setPushdownFilterEnabled(true) .setDeleteAsJoinRewriteEnabled(false) .setManifestCachingEnabled(true) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java index 57a7201bc090d..2d8eba1586e74 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java @@ -20,7 +20,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.Map; import java.util.stream.Collectors; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; @@ -32,8 +31,7 @@ public class TestIcebergTableChangelog protected QueryRunner createQueryRunner() throws Exception { - Map properties = ImmutableMap.of("http-server.http.port", "8080"); - return createIcebergQueryRunner(properties, CatalogType.HADOOP); + return createIcebergQueryRunner(ImmutableMap.of(), CatalogType.HADOOP); } private long[] snapshots = new long[0]; diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java index 6d412d20bcda2..1f50f4c31457f 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java @@ -17,6 +17,7 @@ import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.iceberg.ColumnIdentity.TypeCategory; +import com.facebook.presto.spi.statistics.ColumnStatisticType; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.DoubleRange; import com.facebook.presto.spi.statistics.Estimate; @@ -28,16 +29,15 @@ import org.testng.annotations.Test; import java.util.Collections; +import java.util.EnumSet; import java.util.Optional; import java.util.OptionalLong; import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; -import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES; import static org.testng.Assert.assertEquals; public class TestStatisticsUtil @@ -45,7 +45,7 @@ public class TestStatisticsUtil @Test public void testMergeStrategyNone() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), NONE, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.noneOf(ColumnStatisticType.class), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -59,7 +59,7 @@ public void testMergeStrategyNone() @Test public void testMergeStrategyWithPartitioned() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES), PartitionSpec.builderFor(new Schema(Types.NestedField.required(0, "test", Types.IntegerType.get()))).bucket("test", 100).build()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); @@ -74,7 +74,7 @@ public void testMergeStrategyWithPartitioned() @Test public void testMergeStrategyNDVs() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NDV, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -88,7 +88,7 @@ public void testMergeStrategyNDVs() @Test public void testMergeStrategyNulls() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTIONS, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_NON_NULL_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); @@ -102,7 +102,7 @@ public void testMergeStrategyNulls() @Test public void testMergeStrategyNDVsAndNulls() { - TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, PartitionSpec.unpartitioned()); + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), EnumSet.of(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES), PartitionSpec.unpartitioned()); assertEquals(Estimate.of(1), merged.getRowCount()); assertEquals(Estimate.unknown(), merged.getTotalSize()); assertEquals(1, merged.getColumnStatistics().size()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 1945e1122b8b0..c76babbd0cecc 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -20,6 +20,7 @@ import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.SchemaTableName; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.testng.annotations.Test; @@ -27,6 +28,8 @@ import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES; @Test public class TestIcebergDistributedHive @@ -34,7 +37,7 @@ public class TestIcebergDistributedHive { public TestIcebergDistributedHive() { - super(HIVE, ImmutableMap.of("iceberg.hive-statistics-merge-strategy", "USE_NULLS_FRACTION_AND_NDV")); + super(HIVE, ImmutableMap.of("iceberg.hive-statistics-merge-strategy", Joiner.on(",").join(NUMBER_OF_DISTINCT_VALUES.name(), TOTAL_SIZE_IN_BYTES.name()))); } @Override diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index c6c4d80896abc..8e587e3c9d97b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -50,6 +50,7 @@ import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES; import static com.facebook.presto.testing.assertions.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -67,7 +68,7 @@ public class TestIcebergHiveStatistics protected QueryRunner createQueryRunner() throws Exception { - return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.hive-statistics-merge-strategy", "USE_NDV")); + return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name())); } private static final Set NUMERIC_ORDERS_COLUMNS = ImmutableSet.builder()