From a98f8c1d86d6994d71374bb9acee6615e5370a62 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 25 Jun 2024 19:07:32 +0800 Subject: [PATCH] add profile info and instanceId to AssignedJob --- .../org/apache/doris/common/profile/Profile.java | 11 +++++++++++ .../apache/doris/common/profile/SummaryProfile.java | 2 ++ .../worker/job/AbstractUnassignedScanJob.java | 5 +++-- .../apache/doris/nereids/worker/job/AssignedJob.java | 3 +++ .../nereids/worker/job/LocalShuffleAssignedJob.java | 5 +++-- .../doris/nereids/worker/job/StaticAssignedJob.java | 12 +++++++++++- .../UnassignedGatherScanMultiRemoteTablesJob.java | 6 ++++-- .../doris/nereids/worker/job/UnassignedJob.java | 5 +++-- .../worker/job/UnassignedQueryConstantJob.java | 6 ++++-- .../worker/job/UnassignedScanBucketOlapTableJob.java | 9 +++++++-- .../nereids/worker/job/UnassignedShuffleJob.java | 3 ++- .../doris/planner/NereidsSpecifyInstances.java | 8 +++++++- .../java/org/apache/doris/qe/ConnectContext.java | 6 ++++++ .../main/java/org/apache/doris/qe/Coordinator.java | 8 +++++++- .../java/org/apache/doris/qe/NereidsCoordinator.java | 1 + 15 files changed, 74 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 96cecd19ec62b3e..a1f5e16c1501f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -20,6 +20,8 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.planner.Planner; @@ -108,6 +110,15 @@ public synchronized void updateSummary(long startTime, Map summa } summaryInfo.put(SummaryProfile.PHYSICAL_PLAN, builder.toString().replace("\n", "\n ")); + + + FragmentIdMapping distributedPlans = nereidsPlanner.getDistributedPlans(); + if (distributedPlans != null) { + summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN, + DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + .replace("\n", "\n ") + ); + } } summaryProfile.update(summaryInfo); for (ExecutionProfile executionProfile : executionProfiles) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 9b5ec5d4cd8afc2..2e686171238e1ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -54,6 +54,7 @@ public class SummaryProfile { public static final String TRACE_ID = "Trace ID"; public static final String WORKLOAD_GROUP = "Workload Group"; public static final String PHYSICAL_PLAN = "Physical Plan"; + public static final String DISTRIBUTED_PLAN = "Distributed Plan"; // Execution Summary public static final String EXECUTION_SUMMARY_PROFILE_NAME = "Execution Summary"; public static final String ANALYSIS_TIME = "Analysis Time"; @@ -110,6 +111,7 @@ public class SummaryProfile { public static final ImmutableList SUMMARY_KEYS = new ImmutableList.Builder() .addAll(SUMMARY_CAPTIONS) .add(PHYSICAL_PLAN) + .add(DISTRIBUTED_PLAN) .build(); // The display order of execution summary items. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java index bd0a46779813239..69e7ce1526ece83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AbstractUnassignedScanJob.java @@ -60,6 +60,7 @@ protected List insideMachineParallelization( Map workerToScanRanges, ListMultimap inputJobs) { + ConnectContext context = ConnectContext.get(); boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel(workerToScanRanges); int instanceIndexInFragment = 0; List instances = Lists.newArrayList(); @@ -106,7 +107,7 @@ protected List insideMachineParallelization( int shareScanId = shareScanIdGenerator.getAndIncrement(); for (int i = 0; i < instanceNum; i++) { LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - instanceIndexInFragment++, shareScanId, this, worker, shareScanSource); + instanceIndexInFragment++, shareScanId, context.nextInstanceId(), this, worker, shareScanSource); instances.add(instance); } } else { @@ -121,7 +122,7 @@ protected List insideMachineParallelization( ); for (ScanSource instanceToScanRange : instanceToScanRanges) { - instances.add(assignWorkerAndDataSources(instanceIndexInFragment++, worker, instanceToScanRange)); + instances.add(assignWorkerAndDataSources(instanceIndexInFragment++, context.nextInstanceId(), worker, instanceToScanRange)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java index 31e14c64f706308..96b8c8a51d04c9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/AssignedJob.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.worker.job; import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.thrift.TUniqueId; /** * AssignedJob. @@ -26,6 +27,8 @@ public interface AssignedJob { int indexInUnassignedJob(); + TUniqueId instanceId(); + UnassignedJob unassignedJob(); Worker getAssignedWorker(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java index f829a673e1b8685..3bb6c349ca065c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/LocalShuffleAssignedJob.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.worker.job; import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; @@ -28,10 +29,10 @@ public class LocalShuffleAssignedJob extends StaticAssignedJob { public final int shareScanId; public LocalShuffleAssignedJob( - int indexInUnassignedJob, int shareScanId, + int indexInUnassignedJob, int shareScanId, TUniqueId instanceId, UnassignedJob unassignedJob, Worker worker, ScanSource scanSource) { - super(indexInUnassignedJob, unassignedJob, worker, scanSource); + super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource); this.shareScanId = shareScanId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java index ec1f794318126c6..1e02767644bd013 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/StaticAssignedJob.java @@ -17,7 +17,9 @@ package org.apache.doris.nereids.worker.job; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.nereids.worker.Worker; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; @@ -29,13 +31,15 @@ public class StaticAssignedJob implements AssignedJob { private final int indexInUnassignedJob; private final UnassignedJob unassignedJob; + private final TUniqueId instanceId; private final Worker worker; private final ScanSource scanSource; public StaticAssignedJob( - int indexInUnassignedJob, UnassignedJob unassignedJob, Worker worker, + int indexInUnassignedJob, TUniqueId instanceId, UnassignedJob unassignedJob, Worker worker, ScanSource scanSource) { this.indexInUnassignedJob = indexInUnassignedJob; + this.instanceId = Objects.requireNonNull(instanceId, "instanceId can not be null"); this.unassignedJob = Objects.requireNonNull(unassignedJob, "unassignedJob can not be null"); this.worker = worker; this.scanSource = Objects.requireNonNull(scanSource, "scanSource can not be null"); @@ -46,6 +50,11 @@ public int indexInUnassignedJob() { return indexInUnassignedJob; } + @Override + public TUniqueId instanceId() { + return instanceId; + } + @Override public UnassignedJob unassignedJob() { return unassignedJob; @@ -79,6 +88,7 @@ public String toString(boolean showUnassignedJob) { str.append("\n unassignedJob: ").append(unassignedJob).append(","); } str.append("\n index: " + indexInUnassignedJob) + .append(",\n instanceId: " + DebugUtil.printId(instanceId)) .append(",\n worker: " + worker); for (Entry kv : extraInfo().entrySet()) { str.append(",\n ").append(kv.getKey()).append(": ").append(kv.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java index bc64e256e8fdc76..da0bc1b7c3aaa4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedGatherScanMultiRemoteTablesJob.java @@ -24,6 +24,7 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; @@ -62,6 +63,7 @@ public static boolean canApply(List scanNodes) { @Override public List computeAssignedJobs(WorkerManager workerManager, ListMultimap inputJobs) { + ConnectContext context = ConnectContext.get(); Map scanNodeToScanRanges = Maps.newLinkedHashMap(); for (ScanNode scanNode : scanNodes) { List scanRangeLocations = scanNode.getScanRangeLocations(0); @@ -77,8 +79,8 @@ public List computeAssignedJobs(WorkerManager workerManager, Worker randomWorker = workerManager.randomAvailableWorker(); return ImmutableList.of( - assignWorkerAndDataSources(0, randomWorker, - new DefaultScanSource(scanNodeToScanRanges) + assignWorkerAndDataSources(0, context.nextInstanceId(), + randomWorker, new DefaultScanSource(scanNodeToScanRanges) ) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java index 73af5c4f65af272..8c88e7edff51807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedJob.java @@ -23,6 +23,7 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ListMultimap; @@ -45,7 +46,7 @@ List computeAssignedJobs( // generate an instance job // e.g. build an instance job by a backends and the replica ids it contains default AssignedJob assignWorkerAndDataSources( - int instanceIndexInFragment, Worker worker, ScanSource scanSource) { - return new StaticAssignedJob(instanceIndexInFragment, this, worker, scanSource); + int instanceIndexInFragment, TUniqueId instanceId, Worker worker, ScanSource scanSource) { + return new StaticAssignedJob(instanceIndexInFragment, instanceId, this, worker, scanSource); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java index 71ca43ab82547a7..c7371b64fb77dd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedQueryConstantJob.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.worker.WorkerManager; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -39,9 +40,10 @@ public UnassignedQueryConstantJob(PlanFragment fragment) { public List computeAssignedJobs(WorkerManager workerManager, ListMultimap inputJobs) { Worker randomWorker = workerManager.randomAvailableWorker(); + ConnectContext context = ConnectContext.get(); return ImmutableList.of( - new StaticAssignedJob(0, this, randomWorker, - new DefaultScanSource(ImmutableMap.of()) + new StaticAssignedJob(0, context.nextInstanceId(), this, + randomWorker, new DefaultScanSource(ImmutableMap.of()) ) ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java index 11e07dde433249c..015d20d280ede97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedScanBucketOlapTableJob.java @@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -187,6 +188,9 @@ private List fillUpInstances( if (missingBucketsInLeft.isEmpty()) { return leftSideInstances; } + + ConnectContext context = ConnectContext.get(); + OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0); MaterializedIndex randomPartition = randomPartition(olapScanNode); ListMultimap missingBuckets = selectWorkerForMissingBuckets( @@ -221,12 +225,13 @@ private List fillUpInstances( } if (!mergedBucketsInSameWorkerInstance) { fillUpInstance = new LocalShuffleAssignedJob( - newInstances.size(), shareScanIdGenerator.getAndIncrement(), this, worker, scanSource + newInstances.size(), shareScanIdGenerator.getAndIncrement(), + context.nextInstanceId(), this, worker, scanSource ); } } else { fillUpInstance = assignWorkerAndDataSources( - newInstances.size(), worker, scanSource + newInstances.size(), context.nextInstanceId(), worker, scanSource ); } if (fillUpInstance != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java index d1c7d46b3e1e916..22131db4c9f9d20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/worker/job/UnassignedShuffleJob.java @@ -101,10 +101,11 @@ private List getInstancesOfBiggestParallelChildFragment( private List buildInstances(int instanceNum, Function workerSelector) { ImmutableList.Builder instances = ImmutableList.builderWithExpectedSize(instanceNum); + ConnectContext context = ConnectContext.get(); for (int i = 0; i < instanceNum; i++) { Worker selectedWorker = workerSelector.apply(i); AssignedJob assignedJob = assignWorkerAndDataSources( - i, selectedWorker, new DefaultScanSource(ImmutableMap.of()) + i, context.nextInstanceId(), selectedWorker, new DefaultScanSource(ImmutableMap.of()) ); instances.add(assignedJob); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java index 40309969c8a7f05..1f6722cddc2aab9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NereidsSpecifyInstances.java @@ -26,6 +26,8 @@ import org.apache.doris.nereids.worker.job.StaticAssignedJob; import org.apache.doris.nereids.worker.job.UnassignedJob; import org.apache.doris.nereids.worker.job.WorkerScanSource; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -44,10 +46,14 @@ public NereidsSpecifyInstances(List> workerScanSources) { public List buildAssignedJobs(UnassignedJob unassignedJob) { List instances = Lists.newArrayListWithCapacity(workerScanSources.size()); int instanceNum = 0; + ConnectContext context = ConnectContext.get(); for (WorkerScanSource workerToScanSource : workerScanSources) { + TUniqueId instanceId = context.nextInstanceId(); Worker worker = workerToScanSource.worker; ScanSource scanSource = workerToScanSource.scanSource; - StaticAssignedJob assignedJob = new StaticAssignedJob(instanceNum++, unassignedJob, worker, scanSource); + StaticAssignedJob assignedJob = new StaticAssignedJob( + instanceNum++, instanceId, unassignedJob, worker, scanSource + ); instances.add(assignedJob); } return instances; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index df1a4ac24084028..63285ac00ea4af7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -88,6 +88,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -117,6 +118,7 @@ public enum ConnectType { protected volatile LoadTaskInfo streamLoadInfo; protected volatile TUniqueId queryId = null; + protected volatile AtomicInteger instanceIdGenerator = new AtomicInteger(); protected volatile String traceId; // id for this connection protected volatile int connectionId; @@ -863,6 +865,10 @@ public TUniqueId queryId() { return queryId; } + public TUniqueId nextInstanceId() { + return new TUniqueId(queryId.hi, queryId.lo + instanceIdGenerator.incrementAndGet()); + } + public String getSqlHash() { return sqlHash; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4874d477c932177..37d5d60419411b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1308,10 +1308,16 @@ protected void computeFragmentExecParams() throws Exception { for (int j = 0; j < params.instanceExecParams.size(); ++j) { // we add instance_num to query_id.lo to create a // globally-unique instance id + FInstanceExecParam instanceExecParam = params.instanceExecParams.get(j); + + // already set by nereids coordinator? + if (instanceExecParam.instanceId != null) { + continue; + } TUniqueId instanceId = new TUniqueId(); instanceId.setHi(queryId.hi); instanceId.setLo(queryId.lo + instanceIds.size() + 1); - params.instanceExecParams.get(j).instanceId = instanceId; + instanceExecParam.instanceId = instanceId; instanceIds.add(instanceId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java index 5a67e95b706c587..0d1b8df9d3dd45a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -100,6 +100,7 @@ protected void computeFragmentHosts() { TNetworkAddress address = new TNetworkAddress(worker.host(), worker.port()); FInstanceExecParam instanceExecParam = new FInstanceExecParam( null, address, 0, fragmentExecParams); + instanceExecParam.instanceId = instanceJob.instanceId(); fragmentExecParams.instanceExecParams.add(instanceExecParam); addressToBackendID.put(address, worker.id()); ScanSource scanSource = instanceJob.getScanSource();