From 5e16492b39830dea89fd22118b175efaa989258e Mon Sep 17 00:00:00 2001 From: Nitin Kashyap Date: Fri, 5 Jul 2024 10:52:16 +0530 Subject: [PATCH] fix local_exchange --- be/src/pipeline/dependency.h | 3 ++- .../partitioned_hash_join_sink_operator.cpp | 2 ++ .../exec/partitioned_hash_join_sink_operator.h | 4 +++- be/src/vec/columns/column_string.cpp | 3 ++- be/src/vec/runtime/partitioner.cpp | 4 ++-- be/src/vec/runtime/partitioner.h | 6 +++--- .../doris/datasource/FileQueryScanNode.java | 11 ++++++----- .../datasource/hive/HMSExternalTable.java | 5 ++--- .../translator/PhysicalPlanTranslator.java | 2 ++ .../properties/DistributionSpecHash.java | 18 ++++++++++++++++++ .../doris/planner/DistributedPlanner.java | 2 ++ .../org/apache/doris/planner/HashJoinNode.java | 7 +++++++ .../java/org/apache/doris/qe/Coordinator.java | 1 + gensrc/thrift/PlanNodes.thrift | 1 + 14 files changed, 53 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 71ef2d8b8fd85f6..860ed82d06a63e3 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -743,7 +743,7 @@ inline std::string get_exchange_type_name(ExchangeType idx) { } struct DataDistribution { - DataDistribution(ExchangeType type) : distribution_type(type) {} + DataDistribution(ExchangeType type) : distribution_type(type), hash_type(THashType::CRC32) {} DataDistribution(ExchangeType type, const std::vector& partition_exprs_) : distribution_type(type), partition_exprs(partition_exprs_) {} DataDistribution(const DataDistribution& other) = default; @@ -751,6 +751,7 @@ struct DataDistribution { DataDistribution& operator=(const DataDistribution& other) = default; ExchangeType distribution_type; std::vector partition_exprs; + THashType::type hash_type; }; class ExchangerBase; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d221eaeed0faba4..55df66ac12fcaa6 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -402,6 +402,8 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p descs), _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type : TJoinDistributionType::NONE), + _hash_type(tnode.hash_join_node.__isset.__isset.hash_type ? tnode.hash_join_node.hash_type + : THashType::CRC32), _distribution_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[1] : std::vector {}), diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index d1fe30e06f2dd2c..36637dc973e496d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -109,7 +109,8 @@ class PartitionedHashJoinSinkOperatorX return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, - _distribution_partition_exprs) + _distribution_partition_exprs, + hash_type) : DataDistribution(ExchangeType::HASH_SHUFFLE, _distribution_partition_exprs); } @@ -134,6 +135,7 @@ class PartitionedHashJoinSinkOperatorX Status _setup_internal_operator(RuntimeState* state); const TJoinDistributionType::type _join_distribution; + THashType::type _hash_type; std::vector _build_exprs; diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 931608593cb852c..049930b79c611b2 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -286,7 +286,8 @@ void ColumnStr::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } -void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, +template +void ColumnStr::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, int32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const { auto s = rows; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index bd3b8bf8edbd49c..188918bf9913e8e 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -93,8 +93,8 @@ Status Murmur32HashPartitioner::clone(RuntimeState* state, } template -int32_t Murmur32HashPartitioner::_get_default_seed() { - return static_cast(HashUtil::SPARK_MURMUR_32_SEED); +int32_t Murmur32HashPartitioner::_get_default_seed() const { + return reinterpret_cast(HashUtil::SPARK_MURMUR_32_SEED); } template class Crc32HashPartitioner; diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index ade77467d37c7b6..7dd5d295b4e6623 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -91,8 +91,8 @@ class Crc32HashPartitioner : public PartitionerBase { void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const; - HashValueType _get_default_seed() { - return 0; + HashValueType _get_default_seed() const { + return reinterpret_cast(0); } VExprContextSPtrs _partition_expr_ctxs; @@ -123,7 +123,7 @@ class Murmur32HashPartitioner final : public Partitioner { Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; - int32_t _get_default_seed() override; + int32_t _get_default_seed() const; private: void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index a07ad5b26d727a4..901bf36f5ae3ee4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -432,6 +432,10 @@ private TScanRangeLocations splitToScanRange( isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); if (isSparkBucketedHiveTable) { bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); + if (!bucketSeq2locations.containsKey(bucketNum)) { + bucketSeq2locations.put(bucketNum, curLocations); + } + curLocations = bucketSeq2locations.get(bucketNum).get(0); } } @@ -475,13 +479,10 @@ private TScanRangeLocations splitToScanRange( curLocations.addToLocations(location); if (LOG.isDebugEnabled()) { - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}, bucketNum: {}", curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength(), - Joiner.on("|").join(fileSplit.getHosts())); - } - if (isSparkBucketedHiveTable) { - bucketSeq2locations.put(bucketNum, curLocations); + Joiner.on("|").join(fileSplit.getHosts()), bucketNum); } return curLocations; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 087d605ee434f7a..087a24257d8f03d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -59,6 +59,7 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -80,10 +81,8 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.LocalDate; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2baf3b3dd530e27..7e2416cb2d5eabe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1423,6 +1423,8 @@ public PlanFragment visitPhysicalHashJoin( hashJoinNode.setDistributionMode(DistributionMode.BROADCAST); } else if (JoinUtils.shouldBucketShuffleJoin(physicalHashJoin)) { hashJoinNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + hashJoinNode.setHashType(((DistributionSpecHash) physicalHashJoin.left() + .getPhysicalProperties().getDistributionSpec()).getShuffleFunction()); } else { hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 3bd1ab1ae529647..f5283e7b35eb048 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -346,6 +346,9 @@ public enum StorageBucketHashType { // SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation STORAGE_BUCKET_SPARK_MURMUR32; + /** + * convert to thrift + */ public THashType toThrift() { switch (this) { case STORAGE_BUCKET_CRC32: @@ -357,6 +360,21 @@ public THashType toThrift() { return THashType.XXHASH64; } } + + /** + * convert from thrift + */ + public static StorageBucketHashType fromThrift(THashType hashType) { + switch (hashType) { + case CRC32: + return STORAGE_BUCKET_CRC32; + case SPARK_MURMUR32: + return STORAGE_BUCKET_SPARK_MURMUR32; + case XXHASH64: + default: + return STORAGE_BUCKET_XXHASH64; + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 3ba8efa48a29a04..70922102d6b8718 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -41,6 +41,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; +import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; @@ -339,6 +340,7 @@ private PlanFragment createHashJoinFragment( Ref hashType = Ref.from(THashType.CRC32); if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); + node.setHashType(DistributionSpecHash.StorageBucketHashType.fromThrift(hashType.value)); DataPartition rhsJoinPartition = new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs, hashType.value); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index c3cbf2afce15ac4..12b6481720ea6ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -37,6 +37,7 @@ import org.apache.doris.common.CheckedMath; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; @@ -79,6 +80,7 @@ public class HashJoinNode extends JoinNodeBase { private List markJoinConjuncts; private DistributionMode distrMode; + private DistributionSpecHash.StorageBucketHashType hashType; private boolean isColocate = false; //the flag for colocate join private String colocateReason = ""; // if can not do colocate join, set reason here @@ -249,6 +251,10 @@ public void setColocate(boolean colocate, String reason) { colocateReason = reason; } + public void setHashType(DistributionSpecHash.StorageBucketHashType hashType) { + this.hashType = hashType; + } + /** * Calculate the slots output after going through the hash table in the hash join node. * The most essential difference between 'hashOutputSlots' and 'outputSlots' is that @@ -817,6 +823,7 @@ protected void toThrift(TPlanNode msg) { } } msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift()); + msg.hash_join_node.setHashType(hashType.toThrift()); msg.hash_join_node.setUseSpecificProjections(useSpecificProjections); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 731aeddf6d31673..e9d7fc845c00e79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2722,6 +2722,7 @@ private void computeScanRangeAssignmentByBucketForHive( fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + scanNode.getFragment().setBucketNum(bucketNum); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index bd8c43622d1f608..383052ce927a5cd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -864,6 +864,7 @@ struct THashJoinNode { 13: optional list mark_join_conjuncts // use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections 14: optional bool use_specific_projections + 15: optional Partitions.THashType hash_type } struct TNestedLoopJoinNode {