Skip to content

Commit

Permalink
fix by comments
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jun 27, 2024
1 parent 2e673c5 commit e007125
Show file tree
Hide file tree
Showing 45 changed files with 195 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public synchronized void updateSummary(long startTime, Map<String, String> summa
summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
builder.toString().replace("\n", "\n "));


FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
if (distributedPlans != null) {
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
|| joinType == JoinType.FULL_OUTER_JOIN);
boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
|| rightHashSpec.getShuffleType() == ShuffleType.NATURAL);
return isJoinTypeInScope && isSpecInScope;
return isJoinTypeInScope && isSpecInScope && !SessionVariable.canUseNereidsDistributePlanner();
}

@Override
Expand Down Expand Up @@ -249,8 +249,7 @@ public Boolean visitPhysicalHashJoin(
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
// check colocate join with scan
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)
&& !SessionVariable.canUseNereidsDistributePlanner()) {
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) {
// right anti, right outer, full outer join could not do bucket shuffle join
// TODO remove this after we refactor coordinator
updatedForLeft = Optional.of(calAnotherSideRequired(
Expand Down Expand Up @@ -307,6 +306,9 @@ public Boolean visitPhysicalHashJoin(
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
if (SessionVariable.canUseNereidsDistributePlanner()) {
// nereids coordinator can exchange left side to right side to do bucket shuffle join
// TODO: maybe we should check if left child is PhysicalDistribute.
// If so add storage bucketed shuffle on left side. Other wise,
// add execution bucketed shuffle on right side.
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.NereidsException;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BigIntType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Statistics;
Expand Down Expand Up @@ -95,14 +93,6 @@ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitNumbers(this, context);
}

@Override
public PhysicalProperties getPhysicalProperties() {
if (SessionVariable.canUseNereidsDistributePlanner()) {
return PhysicalProperties.ANY;
}
return super.getPhysicalProperties();
}

@Override
public Numbers withChildren(List<Expression> children) {
Preconditions.checkArgument(children().size() == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -113,6 +114,9 @@ public boolean nullable() {
}

public PhysicalProperties getPhysicalProperties() {
if (SessionVariable.canUseNereidsDistributePlanner()) {
return PhysicalProperties.ANY;
}
return PhysicalProperties.STORAGE_ANY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.job.BucketScanSource;
import org.apache.doris.nereids.worker.job.WorkerScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.job.DefaultScanSource;
import org.apache.doris.nereids.worker.job.WorkerScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.AssignedJobBuilder;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJobBuilder;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJobBuilder;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.doris.nereids.trees.AbstractTreeNode;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;

import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.worker.Worker;
import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.ScanSource;
import org.apache.doris.nereids.worker.job.StaticAssignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.WorkerScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.WorkerScanSource;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TUniqueId;

Expand All @@ -46,7 +46,7 @@ public List<AssignedJob> buildAssignedJobs(UnassignedJob unassignedJob) {
ConnectContext context = ConnectContext.get();
for (WorkerScanSource<S> workerToScanSource : workerScanSources) {
TUniqueId instanceId = context.nextInstanceId();
Worker worker = workerToScanSource.worker;
DistributedPlanWorker worker = workerToScanSource.worker;
ScanSource scanSource = workerToScanSource.scanSource;
StaticAssignedJob assignedJob = new StaticAssignedJob(
instanceNum++, instanceId, unassignedJob, worker, scanSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.doris.nereids.trees.plans.distribute;

import org.apache.doris.nereids.util.Utils;
import org.apache.doris.nereids.worker.job.AssignedJob;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.thrift.TExplainLevel;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;
package org.apache.doris.nereids.trees.plans.distribute.worker;

import org.apache.doris.system.Backend;

import java.util.Objects;

/** BackendWorker */
public class BackendWorker implements Worker {
public class BackendWorker implements DistributedPlanWorker {
private final Backend backend;

public BackendWorker(Backend backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;
package org.apache.doris.nereids.trees.plans.distribute.worker;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.NereidsException;
Expand All @@ -39,7 +39,7 @@ public class BackendWorkerManager implements WorkerManager {
});

@Override
public Worker getWorker(long backendId) {
public DistributedPlanWorker getWorker(long backendId) {
ImmutableMap<Long, Backend> backends = this.backends.get();
Backend backend = backends.get(backendId);
if (backend == null) {
Expand All @@ -49,7 +49,7 @@ public Worker getWorker(long backendId) {
}

@Override
public Worker randomAvailableWorker() {
public DistributedPlanWorker randomAvailableWorker() {
try {
Reference<Long> selectedBackendId = new Reference<>();
ImmutableMap<Long, Backend> backends = this.backends.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;
package org.apache.doris.nereids.trees.plans.distribute.worker;

/** Worker */
public interface Worker extends Comparable<Worker> {
/**
* DistributedPlanWorker: a worker who can execute the assigned job(instance) of the DistributedPlan
*/
public interface DistributedPlanWorker extends Comparable<DistributedPlanWorker> {
long id();

// ipv4/ipv6 address
Expand All @@ -32,7 +34,7 @@ public interface Worker extends Comparable<Worker> {
boolean available();

@Override
default int compareTo(Worker worker) {
default int compareTo(DistributedPlanWorker worker) {
return address().compareTo(worker.address());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.worker;
package org.apache.doris.nereids.trees.plans.distribute.worker;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.worker.job.BucketScanSource;
import org.apache.doris.nereids.worker.job.DefaultScanSource;
import org.apache.doris.nereids.worker.job.ScanRanges;
import org.apache.doris.nereids.worker.job.UnassignedJob;
import org.apache.doris.nereids.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.nereids.worker.job.UninstancedScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UninstancedScanSource;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
Expand All @@ -51,18 +51,18 @@
/** LoadBalanceScanWorkerSelector */
public class LoadBalanceScanWorkerSelector implements ScanWorkerSelector {
private final BackendWorkerManager workerManager = new BackendWorkerManager();
private final Map<Worker, WorkerWorkload> workloads = Maps.newLinkedHashMap();
private final Map<DistributedPlanWorker, WorkerWorkload> workloads = Maps.newLinkedHashMap();

@Override
public WorkerManager getWorkerManager() {
return workerManager;
}

@Override
public Worker selectMinWorkloadWorker(List<Worker> workers) {
Worker minWorkloadWorker = null;
public DistributedPlanWorker selectMinWorkloadWorker(List<DistributedPlanWorker> workers) {
DistributedPlanWorker minWorkloadWorker = null;
WorkerWorkload minWorkload = new WorkerWorkload(Integer.MAX_VALUE, Long.MAX_VALUE);
for (Worker worker : workers) {
for (DistributedPlanWorker worker : workers) {
WorkerWorkload workload = getWorkload(worker);
if (minWorkload.compareTo(workload) > 0) {
minWorkloadWorker = worker;
Expand All @@ -74,8 +74,8 @@ public Worker selectMinWorkloadWorker(List<Worker> workers) {
}

@Override
public Map<Worker, UninstancedScanSource> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) {
Map<Worker, UninstancedScanSource> workerScanRanges = Maps.newLinkedHashMap();
public Map<DistributedPlanWorker, UninstancedScanSource> selectReplicaAndWorkerWithoutBucket(ScanNode scanNode) {
Map<DistributedPlanWorker, UninstancedScanSource> workerScanRanges = Maps.newLinkedHashMap();
// allScanRangesLocations is all scan ranges in all partition which need to scan
List<TScanRangeLocations> allScanRangesLocations = scanNode.getScanRangeLocations(0);
for (TScanRangeLocations onePartitionOneScanRangeLocation : allScanRangesLocations) {
Expand All @@ -97,7 +97,7 @@ public Map<Worker, UninstancedScanSource> selectReplicaAndWorkerWithoutBucket(Sc
}

@Override
public Map<Worker, UninstancedScanSource> selectReplicaAndWorkerWithBucket(
public Map<DistributedPlanWorker, UninstancedScanSource> selectReplicaAndWorkerWithBucket(
UnassignedScanBucketOlapTableJob unassignedJob) {
PlanFragment fragment = unassignedJob.getFragment();
List<ScanNode> scanNodes = unassignedJob.getScanNodes();
Expand Down Expand Up @@ -142,19 +142,19 @@ private Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier() {
};
}

private Map<Worker, UninstancedScanSource> selectForBucket(
private Map<DistributedPlanWorker, UninstancedScanSource> selectForBucket(
UnassignedJob unassignedJob, List<ScanNode> scanNodes,
BiFunction<ScanNode, Integer, List<TScanRangeLocations>> bucketScanRangeSupplier,
Function<ScanNode, Map<Integer, Long>> bucketBytesSupplier) {
Map<Worker, UninstancedScanSource> assignment = Maps.newLinkedHashMap();
Map<DistributedPlanWorker, UninstancedScanSource> assignment = Maps.newLinkedHashMap();

Map<Integer, Long> bucketIndexToBytes = computeEachBucketScanBytes(scanNodes, bucketBytesSupplier);

for (Entry<Integer, Long> kv : bucketIndexToBytes.entrySet()) {
Integer bucketIndex = kv.getKey();
long allScanNodeScanBytesInOneBucket = kv.getValue();

Worker selectedWorker = null;
DistributedPlanWorker selectedWorker = null;
for (ScanNode scanNode : scanNodes) {
List<TScanRangeLocations> allPartitionTabletsInOneBucketInOneTable
= bucketScanRangeSupplier.apply(scanNode, bucketIndex);
Expand Down Expand Up @@ -201,12 +201,12 @@ private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker(
List<TScanRangeLocation> replicaLocations = tabletLocation.getLocations();
int replicaNum = replicaLocations.size();
WorkerWorkload minWorkload = new WorkerWorkload(Integer.MAX_VALUE, Long.MAX_VALUE);
Worker minWorkLoadWorker = null;
DistributedPlanWorker minWorkLoadWorker = null;
TScanRangeLocation selectedReplicaLocation = null;

for (int i = 0; i < replicaNum; i++) {
TScanRangeLocation replicaLocation = replicaLocations.get(i);
Worker worker = workerManager.getWorker(replicaLocation.getBackendId());
DistributedPlanWorker worker = workerManager.getWorker(replicaLocation.getBackendId());
if (!worker.available()) {
continue;
}
Expand All @@ -224,7 +224,7 @@ private WorkerScanRanges selectScanReplicaAndMinWorkloadWorker(
minWorkload.recordOneScanTask(tabletBytes);
ScanRanges scanRanges = new ScanRanges();
TScanRangeParams scanReplicaParams =
ScanWorkerSelector.buildScanReplicaParams(tabletLocation, selectedReplicaLocation);
buildScanReplicaParams(tabletLocation, selectedReplicaLocation);
scanRanges.addScanRange(scanReplicaParams, tabletBytes);
return new WorkerScanRanges(minWorkLoadWorker, scanRanges);
}
Expand All @@ -241,7 +241,7 @@ private List<Pair<TScanRangeParams, Long>> filterReplicaByWorkerInBucket(
boolean foundTabletInThisWorker = false;
for (TScanRangeLocation replicaLocation : onePartitionOneTabletLocation.getLocations()) {
if (replicaLocation.getBackendId() == filterWorkerId) {
TScanRangeParams scanReplicaParams = ScanWorkerSelector.buildScanReplicaParams(
TScanRangeParams scanReplicaParams = buildScanReplicaParams(
onePartitionOneTabletLocation, replicaLocation);
Long replicaSize = ((OlapScanNode) scanNode).getTabletSingleReplicaSize(tabletId);
selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize));
Expand All @@ -255,7 +255,7 @@ private List<Pair<TScanRangeParams, Long>> filterReplicaByWorkerInBucket(
}
} else if (onePartitionOneTabletLocation.getLocations().size() == 1) {
TScanRangeLocation replicaLocation = onePartitionOneTabletLocation.getLocations().get(0);
TScanRangeParams scanReplicaParams = ScanWorkerSelector.buildScanReplicaParams(
TScanRangeParams scanReplicaParams = buildScanReplicaParams(
onePartitionOneTabletLocation, replicaLocation);
Long replicaSize = 0L;
selectedReplicasInOneBucket.add(Pair.of(scanReplicaParams, replicaSize));
Expand All @@ -280,7 +280,7 @@ private Map<Integer, Long> computeEachBucketScanBytes(
return bucketIndexToBytes;
}

private WorkerWorkload getWorkload(Worker worker) {
private WorkerWorkload getWorkload(DistributedPlanWorker worker) {
return workloads.computeIfAbsent(worker, w -> new WorkerWorkload());
}

Expand Down
Loading

0 comments on commit e007125

Please sign in to comment.