From ae368216e5c4f8bf0183702fd1e206a1a4ddf466 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 1 Mar 2024 16:27:26 +0800 Subject: [PATCH] [opt] refine some method name --- .../catalog/HiveExternalDistributionInfo.java | 15 ---------- .../datasource/FederationBackendPolicy.java | 2 -- .../doris/datasource/FileQueryScanNode.java | 8 +++--- .../datasource/hive/HMSExternalTable.java | 16 +++++------ .../doris/datasource/hive/HiveBucketUtil.java | 6 ++++ .../datasource/hive/source/HiveScanNode.java | 4 +-- .../translator/PhysicalPlanTranslator.java | 12 +------- .../properties/DistributionSpecHash.java | 18 +++++++++++- .../LogicalFileScanToPhysicalFileScan.java | 2 +- .../doris/planner/DistributedPlanner.java | 28 +++++++++++-------- .../java/org/apache/doris/qe/Coordinator.java | 28 ++++++++++++------- 11 files changed, 73 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java index 5b15874401908a3..d30d0f2e36cbfb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -29,26 +29,11 @@ public class HiveExternalDistributionInfo extends HashDistributionInfo { @SerializedName(value = "bucketingVersion") private final int bucketingVersion; - public HiveExternalDistributionInfo() { - bucketingVersion = 2; - } - public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { super(bucketNum, distributionColumns); this.bucketingVersion = bucketingVersion; } - public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, - List distributionColumns, int bucketingVersion) { - super(bucketNum, autoBucket, distributionColumns); - this.bucketingVersion = bucketingVersion; - } - - public int getBucketingVersion() { - return bucketingVersion; - } - - @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index a7f6b9431e01049..639a2bd715f7eda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -203,8 +203,6 @@ public void init(BeSelectionPolicy policy) throws UserException { } catch (ExecutionException e) { throw new UserException("failed to get consistent hash", e); } - /*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), - new BackendHash(), backends, Config.virtual_node_number);*/ } public Backend getNextBe() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index ae572d948b8fc57..fb24269c374663a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -369,12 +369,12 @@ public void createScanRangeLocations() throws UserException { ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); - boolean isBucketedHiveTable = false; + boolean isSparkBucketedHiveTable = false; int bucketNum = 0; TableIf targetTable = getTargetTable(); if (targetTable instanceof HMSExternalTable) { - isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); - if (isBucketedHiveTable) { + isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); + if (isSparkBucketedHiveTable) { bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); } } @@ -422,7 +422,7 @@ >>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive t fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); } - if (isBucketedHiveTable) { + if (isSparkBucketedHiveTable) { bucketSeq2locations.put(bucketNum, curLocations); } >>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index fbd51b3efdbe160..c6136d816f98084 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -38,6 +38,7 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.hive.HiveBucketUtil.HiveBucketType; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.MvccSnapshot; @@ -178,7 +179,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; protected List partitionColumns; private List bucketColumns; - private boolean isSparkTable; + private HiveBucketType hiveBucketType = HiveBucketType.NONE; private DLAType dlaType = DLAType.UNKNOWN; @@ -268,12 +269,8 @@ public boolean isHoodieCowTable() { || (params != null && "COPY_ON_WRITE".equalsIgnoreCase(params.get("flink.table.type"))); } - public boolean isSparkTable() { - return isSparkTable; - } - - public boolean isBucketedTable() { - return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable; + public boolean isSparkBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty() && hiveBucketType == HiveBucketType.SPARK; } /** @@ -582,7 +579,7 @@ public Optional initSchema() { private void initBucketingColumns(List columns) { List bucketCols = new ArrayList<>(5); int numBuckets = getBucketColumns(bucketCols); - if (bucketCols.isEmpty() || !isSparkTable) { + if (bucketCols.isEmpty() || hiveBucketType != HiveBucketType.SPARK) { bucketColumns = ImmutableList.of(); distributionInfo = new RandomDistributionInfo(1, true); return; @@ -619,6 +616,7 @@ private int getBucketColumns(List bucketCols) { /* Hive Bucketed Table */ bucketCols.addAll(descriptor.getBucketCols()); numBuckets = descriptor.getNumBuckets(); + hiveBucketType = HiveBucketType.HIVE; } else if (remoteTable.isSetParameters() && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { Map parameters = remoteTable.getParameters(); @@ -633,7 +631,7 @@ private int getBucketColumns(List bucketCols) { } if (numBuckets > 0) { - isSparkTable = true; + hiveBucketType = HiveBucketType.SPARK; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java index ce0d9cfba98bf7e..fc0bed8d5e1c252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java @@ -65,6 +65,12 @@ public class HiveBucketUtil { private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class); + public enum HiveBucketType { + NONE, + HIVE, + SPARK + } + private static final Set SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of( PrimitiveType.BOOLEAN, PrimitiveType.TINYINT, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index a785e72d3305b51..ecd6bc6173018a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -503,7 +503,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User @Override public DataPartition constructInputPartitionByDistributionInfo() { - if (hmsTable.isBucketedTable()) { + if (hmsTable.isSparkBucketedTable()) { DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); if (!(distributionInfo instanceof HashDistributionInfo)) { return DataPartition.RANDOM; @@ -526,7 +526,7 @@ public HMSExternalTable getHiveTable() { @Override public THashType getHashType() { - if (hmsTable.isBucketedTable() + if (hmsTable.isSparkBucketedTable() && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { return THashType.SPARK_MURMUR32; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index d335d7354a67b45..06f86b7e3302f83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2632,17 +2632,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, THashType hashType = THashType.XXHASH64; switch (distributionSpecHash.getShuffleType()) { case STORAGE_BUCKETED: - switch (distributionSpecHash.getShuffleFunction()) { - case STORAGE_BUCKET_SPARK_MURMUR32: - hashType = THashType.SPARK_MURMUR32; - break; - case STORAGE_BUCKET_CRC32: - hashType = THashType.CRC32; - break; - case STORAGE_BUCKET_XXHASH64: - default: - break; - } + hashType = distributionSpecHash.getShuffleFunction().toThrift(); partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; break; case EXECUTION_BUCKETED: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 5bf1a7f52472bc6..3bd1ab1ae529647 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.annotation.Developing; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.THashType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -338,9 +339,24 @@ public enum ShuffleType { * Enums for concrete shuffle functions. */ public enum StorageBucketHashType { + // CRC32 is for Doris internal storage bucket hash function STORAGE_BUCKET_CRC32, + // XXHASH64 is the default hash function for Doris computation layer STORAGE_BUCKET_XXHASH64, - STORAGE_BUCKET_SPARK_MURMUR32 + // SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation + STORAGE_BUCKET_SPARK_MURMUR32; + + public THashType toThrift() { + switch (this) { + case STORAGE_BUCKET_CRC32: + return THashType.CRC32; + case STORAGE_BUCKET_SPARK_MURMUR32: + return THashType.SPARK_MURMUR32; + case STORAGE_BUCKET_XXHASH64: + default: + return THashType.XXHASH64; + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index a0fa806ed2427b0..5162949ed796d18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -81,7 +81,7 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) { } } StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32; - if (hmsExternalTable.isBucketedTable()) { + if (hmsExternalTable.isSparkBucketedTable()) { function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32; } return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 4cc9608088cb81a..3ba8efa48a29a04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -625,11 +625,8 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr } PlanNode leftRoot = leftChildFragment.getPlanRoot(); - // 1.leftRoot be OlapScanNode - if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); - } else if (leftRoot instanceof HiveScanNode) { - return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); + if (leftRoot instanceof ScanNode) { + return canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -637,17 +634,26 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr while (leftRoot instanceof HashJoinNode) { leftRoot = leftRoot.getChild(0); } - if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); - } else if (leftRoot instanceof HiveScanNode) { - return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); + if (leftRoot instanceof ScanNode) { + canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } - private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + private boolean canBucketShuffleJoin(HashJoinNode node, ScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + if (leftScanNode instanceof OlapScanNode) { + return canBucketShuffleJoinForOlap(node, (OlapScanNode) leftScanNode, rhsJoinExprs); + } else if (leftScanNode instanceof HiveScanNode) { + return canBucketShuffleJoinForHive(node, (HiveScanNode) leftScanNode, rhsJoinExprs, hashType); + } else { + return false; + } + } + + private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode leftScanNode, List rhsJoinExprs, Ref hashType) { HMSExternalTable leftTable = leftScanNode.getHiveTable(); @@ -713,7 +719,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNod } //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, + private boolean canBucketShuffleJoinForOlap(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { OlapTable leftTable = leftScanNode.getOlapTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 12aaea726f0adbe..25335ebc073f1d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2170,13 +2170,8 @@ protected void computeScanRangeAssignment() throws Exception { replicaNumPerHost, isEnableOrderedLocations); } if (fragmentContainsBucketShuffleJoin) { - if (scanNode instanceof OlapScanNode) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); - } else if (scanNode instanceof HiveScanNode) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); - } + bucketShuffleJoinController.computeScanRangeAssignmentByBucket(scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2654,8 +2649,21 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } - // to ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByBucket( + final ScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (scanNode instanceof OlapScanNode) { + computeScanRangeAssignmentByBucketForOlap((OlapScanNode) scanNode, idToBackend, addressToBackendID, + replicaNumPerHost); + } else if (scanNode instanceof HiveScanNode) { + computeScanRangeAssignmentByBucketForHive((HiveScanNode) scanNode, idToBackend, addressToBackendID, + replicaNumPerHost); + } + } + + // to ensure the same bucketSeq tablet to the same execHostPort + private void computeScanRangeAssignmentByBucketForOlap( final OlapScanNode scanNode, ImmutableMap idToBackend, Map addressToBackendID, Map replicaNumPerHost) throws Exception { @@ -2695,13 +2703,13 @@ private void computeScanRangeAssignmentByBucket( } } - private void computeScanRangeAssignmentByBucket( + private void computeScanRangeAssignmentByBucketForHive( final HiveScanNode scanNode, ImmutableMap idToBackend, Map addressToBackendID, Map replicaNumPerHost) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { int bucketNum = 0; - if (scanNode.getHiveTable().isBucketedTable()) { + if (scanNode.getHiveTable().isSparkBucketedTable()) { bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); } else { throw new NotImplementedException("bucket shuffle for non-bucketed table not supported");