Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8680] Enable partition stats by default #12707

Merged
merged 14 commits into from
Jan 27, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -2090,14 +2090,18 @@ public boolean isMetadataColumnStatsIndexEnabled() {
*
* <p>The partition stats index is enabled if:
* <ul>
* <li>The column stats is enabled. Partition stats cannot be created without column stats.</li>
* <li>The metadata table is enabled and partition stats index is enabled in the metadata configuration, or</li>
* <li>The partition stats index is not explicitly marked for dropping in the metadata configuration.</li>
* </ul>
*
* @return {@code true} if the partition stats index is enabled, {@code false} otherwise.
*/
public boolean isPartitionStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.PARTITION_STATS.getPartitionPath());
if (isMetadataColumnStatsIndexEnabled()) {
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled() || !isDropMetadataIndex(MetadataPartitionType.PARTITION_STATS.getPartitionPath());
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.FileSystemViewManager
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, ColumnStatsTestParams}
import org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand Down Expand Up @@ -165,7 +161,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase {
val metadataOpts3 = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false"
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false",
HoodieMetadataConfig.DROP_METADATA_INDEX.key -> COLUMN_STATS.getPartitionPath
)

// disable col stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,14 +1163,14 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
val unixTimestamp = resolveExpr(spark, unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get, tableSchema)
literal = Literal.create(1732924800L)
dataFilter = EqualTo(unixTimestamp, literal)
verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)
// verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)
codope marked this conversation as resolved.
Show resolved Hide resolved
spark.sql(s"drop index idx_unix on $tableName")

spark.sql(s"create index idx_to_date on $tableName using column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
metaClient = HoodieTableMetaClient.reload(metaClient)
val toDate = resolveExpr(spark, unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get, tableSchema)
dataFilter = EqualTo(toDate, lit(18230).expr)
verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)
// verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)
codope marked this conversation as resolved.
Show resolved Hide resolved
spark.sql(s"drop index idx_to_date on $tableName")
}
}
Expand Down Expand Up @@ -1258,10 +1258,13 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)

// Validate partition stat records
// first form the keys to validate, because partition stats gets built for all columns
val riderCalifornia = getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider")
val riderTexas = getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider")
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), "RIDER-D", "RIDER-F")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, "RIDER-D", "RIDER-F")
)

spark.sql(s"update $tableName set rider = 'rider-G' where id = 'trip5'")
Expand All @@ -1275,9 +1278,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
// For MOR table, min value would still be RIDER-D since the update is in log file and old parquet file with value RIDER-D is still present
val partitionMinRiderValue = if (isTableMOR) "RIDER-D" else "RIDER-E"
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), partitionMinRiderValue, "RIDER-G")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, partitionMinRiderValue, "RIDER-G")
)

if (isTableMOR) {
Expand All @@ -1293,9 +1296,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {

// Validate partition stat records after update
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), "RIDER-E", "RIDER-H")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, "RIDER-E", "RIDER-H")
)

if (isTableMOR) {
Expand Down Expand Up @@ -1388,10 +1391,13 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, isDataSkippingExpected = true)

// Validate partition stat records
// first form the keys to validate, because partition stats gets built for all columns
val riderCalifornia = getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider")
val riderTexas = getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider")
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), "RIDER-D", "RIDER-F")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, "RIDER-D", "RIDER-F")
)

spark.sql(s"delete from $tableName where id = 'trip5'")
Expand All @@ -1405,9 +1411,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
// For MOR table, min value would still be RIDER-D since the delete is in log file and old parquet file with value RIDER-D is still present
val partitionMinRiderValue = if (isTableMOR) "RIDER-D" else "RIDER-E"
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), partitionMinRiderValue, "RIDER-F")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, partitionMinRiderValue, "RIDER-F")
)

if (isTableMOR) {
Expand All @@ -1424,9 +1430,9 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {

// Validate partition stat records after update
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member6.value, ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') " +
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType}")(
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=california", "rider"), "RIDER-A", "RIDER-C"),
Seq(getPartitionStatsIndexKey(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION_STAT_PREFIX, "state=texas", "rider"), "RIDER-F", "RIDER-F")
s"where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} AND key IN ('$riderCalifornia', '$riderTexas')")(
Seq(riderCalifornia, "RIDER-A", "RIDER-C"),
Seq(riderTexas, "RIDER-F", "RIDER-F")
)

if (isTableMOR) {
Expand Down
Loading