Skip to content

Commit

Permalink
[opt] refine some method name
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored and Nitin-Kashyap committed Dec 13, 2024
1 parent 994d239 commit f49d57e
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,11 @@ public class HiveExternalDistributionInfo extends HashDistributionInfo {
@SerializedName(value = "bucketingVersion")
private final int bucketingVersion;

public HiveExternalDistributionInfo() {
bucketingVersion = 2;
}

public HiveExternalDistributionInfo(int bucketNum, List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket,
List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, autoBucket, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public int getBucketingVersion() {
return bucketingVersion;
}


@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ public void init(BeSelectionPolicy policy) throws UserException {
} catch (ExecutionException e) {
throw new UserException("failed to get consistent hash", e);
}
/*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(),
new BackendHash(), backends, Config.virtual_node_number);*/
}

public Backend getNextBe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ public void createScanRangeLocations() throws UserException {
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();
boolean isBucketedHiveTable = false;
boolean isSparkBucketedHiveTable = false;
int bucketNum = 0;
TableIf targetTable = getTargetTable();
if (targetTable instanceof HMSExternalTable) {
isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable();
if (isBucketedHiveTable) {
isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable();
if (isSparkBucketedHiveTable) {
bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt();
}
}
Expand Down Expand Up @@ -422,7 +422,7 @@ >>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive t
fileSplit.getStart(), fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
}
if (isBucketedHiveTable) {
if (isSparkBucketedHiveTable) {
bucketSeq2locations.put(bucketNum, curLocations);
}
>>>>>>> a5ce2395a2 ([feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HiveBucketUtil.HiveBucketType;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand Down Expand Up @@ -178,7 +179,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
protected List<Column> partitionColumns;
private List<Column> bucketColumns;
private boolean isSparkTable;
private HiveBucketType hiveBucketType = HiveBucketType.NONE;

private DLAType dlaType = DLAType.UNKNOWN;

Expand Down Expand Up @@ -268,12 +269,8 @@ public boolean isHoodieCowTable() {
|| (params != null && "COPY_ON_WRITE".equalsIgnoreCase(params.get("flink.table.type")));
}

public boolean isSparkTable() {
return isSparkTable;
}

public boolean isBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable;
public boolean isSparkBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && hiveBucketType == HiveBucketType.SPARK;
}

/**
Expand Down Expand Up @@ -582,7 +579,7 @@ public Optional<SchemaCacheValue> initSchema() {
private void initBucketingColumns(List<Column> columns) {
List<String> bucketCols = new ArrayList<>(5);
int numBuckets = getBucketColumns(bucketCols);
if (bucketCols.isEmpty() || !isSparkTable) {
if (bucketCols.isEmpty() || hiveBucketType != HiveBucketType.SPARK) {
bucketColumns = ImmutableList.of();
distributionInfo = new RandomDistributionInfo(1, true);
return;
Expand Down Expand Up @@ -619,6 +616,7 @@ private int getBucketColumns(List<String> bucketCols) {
/* Hive Bucketed Table */
bucketCols.addAll(descriptor.getBucketCols());
numBuckets = descriptor.getNumBuckets();
hiveBucketType = HiveBucketType.HIVE;
} else if (remoteTable.isSetParameters()
&& !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) {
Map<String, String> parameters = remoteTable.getParameters();
Expand All @@ -633,7 +631,7 @@ private int getBucketColumns(List<String> bucketCols) {
}

if (numBuckets > 0) {
isSparkTable = true;
hiveBucketType = HiveBucketType.SPARK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
public class HiveBucketUtil {
private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class);

public enum HiveBucketType {
NONE,
HIVE,
SPARK
}

private static final Set<PrimitiveType> SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of(
PrimitiveType.BOOLEAN,
PrimitiveType.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User

@Override
public DataPartition constructInputPartitionByDistributionInfo() {
if (hmsTable.isBucketedTable()) {
if (hmsTable.isSparkBucketedTable()) {
DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo();
if (!(distributionInfo instanceof HashDistributionInfo)) {
return DataPartition.RANDOM;
Expand All @@ -526,7 +526,7 @@ public HMSExternalTable getHiveTable() {

@Override
public THashType getHashType() {
if (hmsTable.isBucketedTable()
if (hmsTable.isSparkBucketedTable()
&& hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) {
return THashType.SPARK_MURMUR32;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2632,17 +2632,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec,
THashType hashType = THashType.XXHASH64;
switch (distributionSpecHash.getShuffleType()) {
case STORAGE_BUCKETED:
switch (distributionSpecHash.getShuffleFunction()) {
case STORAGE_BUCKET_SPARK_MURMUR32:
hashType = THashType.SPARK_MURMUR32;
break;
case STORAGE_BUCKET_CRC32:
hashType = THashType.CRC32;
break;
case STORAGE_BUCKET_XXHASH64:
default:
break;
}
hashType = distributionSpecHash.getShuffleFunction().toThrift();
partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
break;
case EXECUTION_BUCKETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.thrift.THashType;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -338,9 +339,24 @@ public enum ShuffleType {
* Enums for concrete shuffle functions.
*/
public enum StorageBucketHashType {
// CRC32 is for Doris internal storage bucket hash function
STORAGE_BUCKET_CRC32,
// XXHASH64 is the default hash function for Doris computation layer
STORAGE_BUCKET_XXHASH64,
STORAGE_BUCKET_SPARK_MURMUR32
// SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation
STORAGE_BUCKET_SPARK_MURMUR32;

public THashType toThrift() {
switch (this) {
case STORAGE_BUCKET_CRC32:
return THashType.CRC32;
case STORAGE_BUCKET_SPARK_MURMUR32:
return THashType.SPARK_MURMUR32;
case STORAGE_BUCKET_XXHASH64:
default:
return THashType.XXHASH64;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) {
}
}
StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32;
if (hmsExternalTable.isBucketedTable()) {
if (hmsExternalTable.isSparkBucketedTable()) {
function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32;
}
return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,29 +625,35 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
}

PlanNode leftRoot = leftChildFragment.getPlanRoot();
// 1.leftRoot be OlapScanNode
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
return canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}

// 2.leftRoot be hashjoin node
if (leftRoot instanceof HashJoinNode) {
while (leftRoot instanceof HashJoinNode) {
leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}
}

return false;
}

private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode,
private boolean canBucketShuffleJoin(HashJoinNode node, ScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
if (leftScanNode instanceof OlapScanNode) {
return canBucketShuffleJoinForOlap(node, (OlapScanNode) leftScanNode, rhsJoinExprs);
} else if (leftScanNode instanceof HiveScanNode) {
return canBucketShuffleJoinForHive(node, (HiveScanNode) leftScanNode, rhsJoinExprs, hashType);
} else {
return false;
}
}

private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
HMSExternalTable leftTable = leftScanNode.getHiveTable();

Expand Down Expand Up @@ -713,7 +719,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNod
}

//the join expr must contian left table distribute column
private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode,
private boolean canBucketShuffleJoinForOlap(HashJoinNode node, OlapScanNode leftScanNode,
List<Expr> rhsJoinExprs) {
OlapTable leftTable = leftScanNode.getOlapTable();

Expand Down
28 changes: 18 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2170,13 +2170,8 @@ protected void computeScanRangeAssignment() throws Exception {
replicaNumPerHost, isEnableOrderedLocations);
}
if (fragmentContainsBucketShuffleJoin) {
if (scanNode instanceof OlapScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
bucketShuffleJoinController.computeScanRangeAssignmentByBucket(scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
Expand Down Expand Up @@ -2654,8 +2649,21 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucket(
final ScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (scanNode instanceof OlapScanNode) {
computeScanRangeAssignmentByBucketForOlap((OlapScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
computeScanRangeAssignmentByBucketForHive((HiveScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
}
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucketForOlap(
final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
Expand Down Expand Up @@ -2695,13 +2703,13 @@ private void computeScanRangeAssignmentByBucket(
}
}

private void computeScanRangeAssignmentByBucket(
private void computeScanRangeAssignmentByBucketForHive(
final HiveScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
int bucketNum = 0;
if (scanNode.getHiveTable().isBucketedTable()) {
if (scanNode.getHiveTable().isSparkBucketedTable()) {
bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum();
} else {
throw new NotImplementedException("bucket shuffle for non-bucketed table not supported");
Expand Down

0 comments on commit f49d57e

Please sign in to comment.