Skip to content

Commit

Permalink
Collect data size statistics for Iceberg tables
Browse files Browse the repository at this point in the history
Previously, the data size statistic was computed by using the
Iceberg data manifests data size field. This is value is misleading
for Presto because it represents the compressed on-disk size.

This change allows ANALYZE to read and write data size statistic
values to puffin files.
  • Loading branch information
ZacBlanco committed Mar 28, 2024
1 parent 3c51f08 commit 79251b5
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 174 deletions.
17 changes: 11 additions & 6 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,13 @@ Property Name Description

``iceberg.enable-parquet-dereference-pushdown`` Enable parquet dereference pushdown. ``true``

``iceberg.hive-statistics-merge-strategy`` Determines how to merge statistics that are stored in the ``NONE``
Hive Metastore. The available values are ``NONE``,
``USE_NULLS_FRACTION_AND_NDV``, ``USE_NULLS_FRACTIONS``
and, ``USE_NDV``
``iceberg.hive-statistics-merge-strategy`` Comma separated list of statistics to use from the
Hive Metastore to override Iceberg table statistics.
The available values are ``NUMBER_OF_DISTINCT_VALUES``,
``NUMBER_OF_NON_NULL_VALUES``, ``TOTAL_SIZE_IN_BYTES``.

**Note**: Only valid when the Iceberg connector is
configured with Hive.

``iceberg.statistic-snapshot-record-difference-weight`` The amount that the difference in total record count matters
when calculating the closest snapshot when picking
Expand Down Expand Up @@ -306,6 +309,8 @@ Property Name Description
============================================= ======================================================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
``iceberg.delete-as-join-rewrite-enabled`` in the current session.
``iceberg.hive_statistics_merge_strategy`` Overrides the behavior of the connector property
``iceberg.hive-statistics-merge-strategy`` in the current session.
============================================= ======================================================================

Caching Support
Expand Down Expand Up @@ -1172,7 +1177,7 @@ each Iceberg data type to the corresponding Presto data type, and from each Pres
The following tables detail the specific type maps between PrestoDB and Iceberg.

Iceberg to PrestoDB type mapping
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Map of Iceberg types to the relevant PrestoDB types:

Expand Down Expand Up @@ -1215,7 +1220,7 @@ Map of Iceberg types to the relevant PrestoDB types:
No other types are supported.

PrestoDB to Iceberg type mapping
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Map of PrestoDB types to the relevant Iceberg types:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.hadoop.HadoopFileIO;
Expand All @@ -26,7 +26,11 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.facebook.presto.hive.HiveCompressionCodec.GZIP;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
Expand All @@ -51,12 +55,13 @@ public class IcebergConfig
private boolean pushdownFilterEnabled;
private boolean deleteAsJoinRewriteEnabled = true;

private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;
private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;

@NotNull
public FileFormat getFileFormat()
{
Expand Down Expand Up @@ -195,16 +200,22 @@ public boolean isMergeOnReadModeEnabled()
}

@Config("iceberg.hive-statistics-merge-strategy")
@ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore")
public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy)
{
this.hiveStatisticsMergeStrategy = mergeStrategy;
@ConfigDescription("Comma separated list of statistics to use from the Hive metastore to override iceberg table statistics")
public IcebergConfig setHiveStatisticsMergeFlags(String mergeFlags)
{
this.hiveStatisticsMergeFlags = Optional.of(Arrays.stream(mergeFlags.trim().split(","))
.filter(value -> !value.isEmpty())
.map(ColumnStatisticType::valueOf)
.collect(Collectors.toSet()))
.filter(set -> !set.isEmpty())
.map(EnumSet::copyOf)
.orElse(EnumSet.noneOf(ColumnStatisticType.class));
return this;
}

public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy()
public EnumSet<ColumnStatisticType> getHiveStatisticsMergeFlags()
{
return hiveStatisticsMergeStrategy;
return hiveStatisticsMergeFlags;
}

@Config("iceberg.statistic-snapshot-record-difference-weight")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand All @@ -57,6 +56,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.Estimate;
Expand All @@ -82,6 +82,7 @@
import java.io.IOException;
import java.time.ZoneId;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -442,7 +443,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session);
EnumSet<ColumnStatisticType> mergeFlags = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<ColumnHandle> domainPredicate = layoutHandle.getDomainPredicate()
.transform(subfield -> isEntireColumn(subfield) ? subfield.getRootName() : null)
Expand All @@ -455,7 +456,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
});
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec());
TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeFlags, icebergTable.spec());
TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder()
.setRowCount(mergedStatistics.getRowCount());
double totalSize = 0;
Expand All @@ -481,11 +482,8 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
IcebergColumnHandle::getName,
IcebergColumnHandle::getType)));
}).orElseGet(() -> {
if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) {
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec());
}
return icebergStatistics;
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getIcebergTableName().getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeFlags, icebergTable.spec());
});
}

Expand All @@ -500,9 +498,16 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession
{
Set<ColumnStatisticMetadata> columnStatistics = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
.stream()
.map(statType -> statType.getColumnStatisticMetadata(meta.getName())))
.flatMap(meta -> {
try {
return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
.stream()
.map(statType -> statType.getColumnStatisticMetadata(meta.getName()));
}
catch (IllegalArgumentException e) {
return ImmutableSet.<ColumnStatisticMetadata>of().stream();
}
})
.collect(toImmutableSet());

Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,30 @@
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties;

import javax.inject.Inject;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.doubleProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_NON_NULL_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;

public final class IcebergSessionProperties
{
Expand Down Expand Up @@ -156,14 +162,15 @@ public IcebergSessionProperties(
false),
new PropertyMetadata<>(
HIVE_METASTORE_STATISTICS_MERGE_STRATEGY,
"choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: "
+ Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()),
"Flags to choose which statistics from the Hive Metastore are used when calculating table stats. Valid values are: "
+ Joiner.on(", ").join(NUMBER_OF_DISTINCT_VALUES, NUMBER_OF_NON_NULL_VALUES, TOTAL_SIZE_IN_BYTES),
VARCHAR,
HiveStatisticsMergeStrategy.class,
icebergConfig.getHiveStatisticsMergeStrategy(),
EnumSet.class,
icebergConfig.getHiveStatisticsMergeFlags(),
false,
val -> HiveStatisticsMergeStrategy.valueOf((String) val),
HiveStatisticsMergeStrategy::name),
val -> EnumSet.copyOf(Arrays.stream(((String) val).trim().split(",")).map(ColumnStatisticType::valueOf)
.collect(Collectors.toSet())),
set -> Joiner.on(",").join(((EnumSet<ColumnStatisticType>) set).stream().map(Enum::name).iterator())),
booleanProperty(
PUSHDOWN_FILTER_ENABLED,
"Experimental: Enable Filter Pushdown for Iceberg. This is only supported with Native Worker.",
Expand Down Expand Up @@ -272,9 +279,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session)
return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class);
}

public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session)
public static EnumSet<ColumnStatisticType> getHiveStatisticsMergeStrategy(ConnectorSession session)
{
return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class);
return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, EnumSet.class);
}

public static boolean isPushdownFilterEnabled(ConnectorSession session)
Expand Down
Loading

0 comments on commit 79251b5

Please sign in to comment.