diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index bd1e36e7bc968b..5451a219edfd3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -399,7 +399,7 @@ public SelectedPartitions initSelectedPartitions(Optional snapshot * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 134ad362fa1eed..2115f47d777b80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -306,7 +306,7 @@ public boolean supportInternalPartitionPruned() { } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 0f748f59e927bc..dbbbcf2d6a1e5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -92,7 +92,7 @@ public List getPartitionColumns() { } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { if (getPartitionColumns().isEmpty()) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 8e9ef1eaa97b7a..e6f384502d620f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -461,17 +461,14 @@ protected Pair>, Map>> return Pair.of(ImmutableMap.of(), ImmutableMap.of()); } // Collect the mv related base table partitions which query used - Map> queryUsedBaseTablePartitions = new LinkedHashMap<>(); + Map> queryUsedBaseTablePartitions = new LinkedHashMap<>(); queryUsedBaseTablePartitions.put(relatedPartitionTable, new HashSet<>()); queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), queryUsedBaseTablePartitions); // Bail out, not check invalid partition if not olap scan, support later if (queryUsedBaseTablePartitions.isEmpty()) { return Pair.of(ImmutableMap.of(), ImmutableMap.of()); } - Set queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable) - .stream() - .map(Partition::getName) - .collect(Collectors.toSet()); + Set queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable); Collection mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, cascadesContext.getConnectContext(), System.currentTimeMillis(), false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java index 526ec7030d2db5..a235fd677fb32e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java @@ -17,9 +17,9 @@ package org.apache.doris.nereids.rules.exploration.mv; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; @@ -49,6 +49,8 @@ import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; @@ -724,22 +726,28 @@ public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, * Collect partitions on base table */ public static class QueryScanPartitionsCollector extends DefaultPlanVisitor>> { + Map>> { @Override public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, - Map> targetTablePartitionMap) { + Map> targetTablePartitionMap) { TableIf table = catalogRelation.getTable(); BaseTableInfo relatedPartitionTable = new BaseTableInfo(table); if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) { return catalogRelation; } + Set tablePartitions = targetTablePartitionMap.get(relatedPartitionTable); if (catalogRelation instanceof LogicalOlapScan) { // Handle olap table LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation; - Set tablePartitions = targetTablePartitionMap.get(relatedPartitionTable); for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) { - tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId)); + tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName()); } + } else if (catalogRelation instanceof LogicalFileScan + && catalogRelation.getTable() instanceof ExternalTable + && ((ExternalTable) catalogRelation.getTable()).supportInternalPartitionPruned()) { + LogicalFileScan logicalFileScan = (LogicalFileScan) catalogRelation; + SelectedPartitions selectedPartitions = logicalFileScan.getSelectedPartitions(); + tablePartitions.addAll(selectedPartitions.selectedPartitions.keySet()); } else { // todo Support other type partition table // Not support to partition check now when query external catalog table, support later. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index de284bd837748f..8210b8b552aba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -301,16 +302,19 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) table; for (String partitionName : filterTableEntry.getValue()) { Partition partition = targetTable.getPartition(partitionName); - if (!(targetTable instanceof OlapTable)) { - // check partition is have data or not, only support olap table - break; - } - if (!((OlapTable) targetTable).selectNonEmptyPartitionIds( + if (targetTable instanceof OlapTable && !((OlapTable) targetTable).selectNonEmptyPartitionIds( Lists.newArrayList(partition.getId())).isEmpty()) { - // Add filter only when partition has data + // Add filter only when partition has data when olap table partitionHasDataItems.add( ((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId())); } + if (targetTable instanceof ExternalTable) { + // Add filter only when partition has data when external table + // TODO: 2024/12/4 real snapshot + partitionHasDataItems.add( + ((ExternalTable) targetTable).getNameToPartitionItems(Optional.empty()) + .get(partitionName)); + } } if (partitionHasDataItems.isEmpty()) { predicates.setNeedAddFilter(false); diff --git a/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out b/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out new file mode 100644 index 00000000000000..452cff71e53e9c --- /dev/null +++ b/regression-test/data/mtmv_p0/test_hive_rewrite_mtmv.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refresh_one_partition -- +20230101 3 + +-- !refresh_one_partition_rewrite -- +20230101 3 +20230102 3 + +-- !refresh_complete -- +20230101 3 +20230102 3 + +-- !refresh_all_partition_rewrite -- +20230101 3 +20230102 3 + +-- !refresh_one_partition -- +20230101 3 + +-- !refresh_one_partition_rewrite -- +20230101 3 +20230102 3 + +-- !refresh_complete -- +20230101 3 +20230102 3 + +-- !refresh_all_partition_rewrite -- +20230101 3 +20230102 3 + diff --git a/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy new file mode 100644 index 00000000000000..f10d6bd65b4d3b --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_hive_rewrite_mtmv.groovy @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_rewrite_mtmv", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + String suiteName = "test_hive_rewrite_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT part_col,count(*) as num FROM ${catalogName}.`default`.mtmv_base1 group by part_col;"; + for (String hivePrefix : ["hive2", "hive3"]) { + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + sql """drop catalog if exists ${catalogName}""" + sql """create catalog if not exists ${catalogName} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + sql """analyze table ${catalogName}.`default`.mtmv_base1 with sync""" + sql """alter table ${catalogName}.`default`.mtmv_base1 modify column part_col set stats ('row_count'='6');""" + + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`part_col`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mvSql} + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_20230101")) + assertTrue(showPartitionsResult.toString().contains("p_20230102")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20230101); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName}" + + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + assertTrue(explainOnePartition.toString().contains("part_col[#4] = 20230102")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh complete + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_complete "SELECT * FROM ${mvName}" + + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + assertTrue(explainAllPartition.toString().contains("partitions=2/2")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + } +} +