From 98ab59f490f7de0e538f8f9c4afee9d1d4101ed8 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Fri, 25 Oct 2024 22:33:27 +0800 Subject: [PATCH] support broker load --- .../org/apache/doris/catalog/EnvFactory.java | 23 +- .../apache/doris/nereids/NereidsPlanner.java | 4 + .../doris/qe/NereidsSqlCoordinator.java | 42 ++-- .../org/apache/doris/qe/SessionVariable.java | 4 +- .../doris/qe/SqlCoordinatorContext.java | 230 +++++++++++------- .../doris/qe/runtime/LoadProcessor.java | 4 +- .../doris/qe/runtime/QueryProcessor.java | 22 +- .../runtime/RuntimeFiltersThriftBuilder.java | 4 +- .../doris/qe/runtime/SqlPipelineTask.java | 6 +- .../doris/qe/runtime/ThriftPlansBuilder.java | 39 ++- 10 files changed, 219 insertions(+), 159 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java index 97dbf31d0444ec..ab0cec460e7117 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java @@ -33,7 +33,11 @@ import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner; +import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; @@ -147,8 +151,23 @@ public Coordinator createCoordinator(ConnectContext context, Analyzer analyzer, public Coordinator createCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, List scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) { - return new Coordinator(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, - enableProfile); + if (SessionVariable.canUseNereidsDistributePlanner()) { + ConnectContext connectContext = new ConnectContext(); + connectContext.setQueryId(queryId); + StatementContext statementContext = new StatementContext( + connectContext, new OriginStatement("", 0) + ); + DistributePlanner distributePlanner = new DistributePlanner(statementContext, fragments); + FragmentIdMapping distributedPlans = distributePlanner.plan(); + + return new NereidsSqlCoordinator( + jobId, queryId, descTable, fragments, distributedPlans.valueList(), + scanNodes, timezone, loadZeroTolerance, enableProfile + ); + } + return new Coordinator( + jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile + ); } public GroupCommitPlanner createGroupCommitPlanner(Database db, OlapTable table, List targetColumnNames, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 1d420f681c8370..26665275ff636b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -736,6 +736,10 @@ public CascadesContext getCascadesContext() { return cascadesContext; } + public ConnectContext getConnectContext() { + return cascadesContext.getConnectContext(); + } + public static PhysicalProperties buildInitRequireProperties() { return PhysicalProperties.GATHER; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsSqlCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsSqlCoordinator.java index 5631bb6f5fab26..15c133808c9d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsSqlCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsSqlCoordinator.java @@ -18,6 +18,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; @@ -26,7 +27,6 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; -import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.stats.StatsErrorEstimator; @@ -77,30 +77,39 @@ public class NereidsSqlCoordinator extends Coordinator { private static final Logger LOG = LogManager.getLogger(NereidsSqlCoordinator.class); - private final SqlCoordinatorContext coordinatorContext; + protected final SqlCoordinatorContext coordinatorContext; - private volatile SqlPipelineTask executionTask; + protected volatile SqlPipelineTask executionTask; public NereidsSqlCoordinator(ConnectContext context, Analyzer analyzer, NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator) { super(context, analyzer, planner, statsErrorEstimator); - this.coordinatorContext = SqlCoordinatorContext.build(planner, this); + this.coordinatorContext = SqlCoordinatorContext.buildForSql(planner, this); this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext)); Preconditions.checkState(!planner.getFragments().isEmpty() - && coordinatorContext.instanceNum > 0, "Fragment and Instance can not be empty˚"); + && coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚"); + } + + public NereidsSqlCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, + List fragments, List distributedPlans, + List scanNodes, String timezone, boolean loadZeroTolerance, + boolean enableProfile) { + super(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile); + this.coordinatorContext = SqlCoordinatorContext.buildForLoad( + this, jobId, queryId, fragments, distributedPlans, scanNodes, + descTable, timezone, loadZeroTolerance, enableProfile + ); } @Override public void exec() throws Exception { - coordinatorContext.updateProfileIfPresent(SummaryProfile::setAssignFragmentTime); - enqueue(coordinatorContext.connectContext); - processTopSink(coordinatorContext, coordinatorContext.planner); + processTopSink(coordinatorContext, coordinatorContext.topDistributedPlan); - QeProcessorImpl.INSTANCE.registerInstances(coordinatorContext.queryId, coordinatorContext.instanceNum); + QeProcessorImpl.INSTANCE.registerInstances(coordinatorContext.queryId, coordinatorContext.instanceNum.get()); Map workerToFragments = ThriftPlansBuilder.plansToThrift(coordinatorContext); @@ -110,14 +119,14 @@ public void exec() throws Exception { @Override public boolean isTimeout() { - return System.currentTimeMillis() > coordinatorContext.timeoutDeadline; + return System.currentTimeMillis() > coordinatorContext.timeoutDeadline.get(); } @Override public void cancel(Status cancelReason) { coordinatorContext.getQueueToken().ifPresent(QueueToken::cancel); - for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) { + for (ScanNode scanNode : coordinatorContext.scanNodes) { scanNode.stop(); } @@ -326,7 +335,7 @@ public List getFragmentInstanceInfos() { @Override public List getFragments() { - return coordinatorContext.planner.getFragments(); + return coordinatorContext.fragments; } @Override @@ -389,7 +398,7 @@ public void close() { } try { - for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) { + for (ScanNode scanNode : coordinatorContext.scanNodes) { scanNode.stop(); } } catch (Throwable t) { @@ -401,9 +410,8 @@ protected void cancelInternal(Status cancelReason) { coordinatorContext.withLock(() -> coordinatorContext.getJobProcessor().cancel(cancelReason)); } - private void processTopSink(SqlCoordinatorContext coordinatorContext, NereidsPlanner nereidsPlanner) - throws AnalysisException { - PipelineDistributedPlan topPlan = (PipelineDistributedPlan) nereidsPlanner.getDistributedPlans().last(); + protected void processTopSink( + SqlCoordinatorContext coordinatorContext, PipelineDistributedPlan topPlan) throws AnalysisException { setForArrowFlight(coordinatorContext, topPlan); setForBroker(coordinatorContext, topPlan); } @@ -474,7 +482,7 @@ private boolean shouldQueue(ConnectContext context) { return false; } // a query with ScanNode need not queue only when all its scan node is SchemaScanNode - for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) { + for (ScanNode scanNode : coordinatorContext.scanNodes) { if (!(scanNode instanceof SchemaScanNode)) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3c66b3648e17c1..caba12515c0b19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3373,11 +3373,11 @@ public static boolean canUseNereidsDistributePlanner() { } ConnectContext connectContext = ConnectContext.get(); if (connectContext == null) { - return false; + return true; } StatementContext statementContext = connectContext.getStatementContext(); if (statementContext == null) { - return false; + return true; } StatementBase parsedStatement = statementContext.getParsedStatement(); if (!(parsedStatement instanceof LogicalPlanAdapter)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SqlCoordinatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SqlCoordinatorContext.java index 73d552ec5eb0c0..9d98c013263aca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SqlCoordinatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SqlCoordinatorContext.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Status; @@ -29,11 +30,10 @@ import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; -import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource; -import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource; -import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges; -import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource; +import org.apache.doris.nereids.trees.plans.physical.TopnFilter; import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.runtime.LoadProcessor; import org.apache.doris.qe.runtime.QueryProcessor; @@ -48,6 +48,7 @@ import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TUniqueId; +import com.clearspring.analytics.util.Lists; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; @@ -72,74 +73,100 @@ public class SqlCoordinatorContext { // these are some constant parameters public final NereidsSqlCoordinator coordinator; + public final List fragments; + public final boolean isBlockQuery; public final DataSink dataSink; public final ExecutionProfile executionProfile; public final ConnectContext connectContext; - public final NereidsPlanner planner; + public final PipelineDistributedPlan topDistributedPlan; + public final List distributedPlans; public final TUniqueId queryId; public final TQueryGlobals queryGlobals; public final TQueryOptions queryOptions; public final TDescriptorTable descriptorTable; - public final TNetworkAddress coordinatorAddress; - public final TNetworkAddress directConnectFrontendAddress; - public final long timeoutDeadline; - public final boolean twoPhaseExecution; - public final int instanceNum; + public final TNetworkAddress coordinatorAddress = new TNetworkAddress(Coordinator.localIP, Config.rpc_port); + // public final TNetworkAddress directConnectFrontendAddress; + public final List runtimeFilters; + public final List topnFilters; + public final List scanNodes; + public final Supplier timeoutDeadline = Suppliers.memoize(this::computeTimeoutDeadline); + public final Supplier instanceNum = Suppliers.memoize(this::computeInstanceNum); public final Supplier> instanceIds = Suppliers.memoize(this::getInstanceIds); public final Supplier> backends = Suppliers.memoize(this::getBackends); public final Supplier scanRangeNum = Suppliers.memoize(this::getScanRangeNum); + public final Supplier directConnectFrontendAddress + = Suppliers.memoize(this::computeDirectConnectCoordinator); // these are some mutable states - private volatile Status status; - private volatile Optional queryQueue; - private volatile Optional queueToken; + private volatile Status status = new Status(); + private volatile Optional queryQueue = Optional.empty(); + private volatile Optional queueToken = Optional.empty(); private volatile List workloadGroups = ImmutableList.of(); // query or load processor private volatile JobProcessor jobProcessor; - private SqlCoordinatorContext(NereidsSqlCoordinator coordinator, + // for sql execution + private SqlCoordinatorContext( + NereidsSqlCoordinator coordinator, ConnectContext connectContext, - NereidsPlanner planner, + boolean isBlockQuery, + List distributedPlans, + List fragments, + List runtimeFilters, + List topnFilters, + List scanNodes, ExecutionProfile executionProfile, TQueryGlobals queryGlobals, TQueryOptions queryOptions, - TDescriptorTable descriptorTable, - TNetworkAddress coordinatorAddress, - TNetworkAddress directConnectFrontendAddress) { + TDescriptorTable descriptorTable) { this.connectContext = connectContext; - this.planner = planner; + this.isBlockQuery = isBlockQuery; + this.fragments = fragments; + this.distributedPlans = distributedPlans; + this.topDistributedPlan = distributedPlans.get(distributedPlans.size() - 1); + this.dataSink = topDistributedPlan.getFragmentJob().getFragment().getSink(); + this.runtimeFilters = runtimeFilters == null ? Lists.newArrayList() : runtimeFilters; + this.topnFilters = topnFilters == null ? Lists.newArrayList() : topnFilters; + this.scanNodes = scanNodes; this.queryId = connectContext.queryId(); this.executionProfile = executionProfile; this.queryGlobals = queryGlobals; this.queryOptions = queryOptions; this.descriptorTable = descriptorTable; - this.coordinatorAddress = coordinatorAddress; - this.directConnectFrontendAddress = directConnectFrontendAddress; - - PipelineDistributedPlan topPlan = (PipelineDistributedPlan) planner.getDistributedPlans().last(); - this.dataSink = topPlan.getFragmentJob().getFragment().getSink(); - - // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, - // else use exec_plan_fragments directly. - // we choose #fragments > 1 because in some cases - // we need ensure that A fragment is already prepared to receive data before B fragment sends data. - // For example: select * from numbers("number"="10") will generate ExchangeNode and - // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not - // send data until ExchangeNode is ready to receive. - this.twoPhaseExecution = planner.getDistributedPlans().size() > 1; - this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L; this.coordinator = Objects.requireNonNull(coordinator, "coordinator can not be null"); - this.status = new Status(); + } - this.instanceNum = planner.getDistributedPlans().valueList() - .stream().map(plan -> ((PipelineDistributedPlan) plan).getInstanceJobs().size()) - .reduce(Integer::sum) - .get(); + // for broker load + private SqlCoordinatorContext( + NereidsSqlCoordinator coordinator, + long jobId, + List fragments, + List distributedPlans, + List scanNodes, + TUniqueId queryId, + TQueryOptions queryOptions, + TQueryGlobals queryGlobals, + TDescriptorTable descriptorTable, + ExecutionProfile executionProfile) { + this.coordinator = coordinator; + this.isBlockQuery = true; + this.fragments = fragments; + this.distributedPlans = distributedPlans; + this.topDistributedPlan = distributedPlans.get(distributedPlans.size() - 1); + this.dataSink = topDistributedPlan.getFragmentJob().getFragment().getSink(); + this.scanNodes = scanNodes; + this.queryId = queryId; + this.queryOptions = queryOptions; + this.queryGlobals = queryGlobals; + this.descriptorTable = descriptorTable; + this.executionProfile = executionProfile; - this.queryQueue = Optional.empty(); - this.queueToken = Optional.empty(); + this.connectContext = ConnectContext.get(); + this.runtimeFilters = ImmutableList.of(); + this.topnFilters = ImmutableList.of(); + this.jobProcessor = new LoadProcessor(this, jobId); } public void setQueueInfo(QueryQueue queryQueue, QueueToken queueToken) { @@ -170,6 +197,17 @@ public void updateProfileIfPresent(Consumer profileAction) { .ifPresent(profileAction); } + public boolean twoPhaseExecution() { + // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // else use exec_plan_fragments directly. + // we choose #fragments > 1 because in some cases + // we need ensure that A fragment is already prepared to receive data before B fragment sends data. + // For example: select * from numbers("number"="10") will generate ExchangeNode and + // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does not + // send data until ExchangeNode is ready to receive. + return distributedPlans.size() > 1; + } + public boolean isEos() { return jobProcessor instanceof QueryProcessor && coordinator.isEos(); } @@ -228,17 +266,11 @@ public QueryProcessor asQueryProcessor() { return (QueryProcessor) jobProcessor; } - public static SqlCoordinatorContext build(NereidsPlanner planner, NereidsSqlCoordinator coordinator) { + public static SqlCoordinatorContext buildForSql(NereidsPlanner planner, NereidsSqlCoordinator coordinator) { ConnectContext connectContext = planner.getCascadesContext().getConnectContext(); TQueryOptions queryOptions = initQueryOptions(connectContext); TQueryGlobals queryGlobals = initQueryGlobals(connectContext); TDescriptorTable descriptorTable = planner.getDescTable().toThrift(); - TNetworkAddress coordinatorAddress = new TNetworkAddress(Coordinator.localIP, Config.rpc_port); - String currentConnectedFEIp = connectContext.getCurrentConnectedFEIp(); - TNetworkAddress directConnectFrontendAddress = - connectContext.isProxy() && !StringUtils.isBlank(currentConnectedFEIp) - ? new TNetworkAddress(currentConnectedFEIp, Config.rpc_port) - : coordinatorAddress; ExecutionProfile executionProfile = new ExecutionProfile( connectContext.queryId, @@ -248,11 +280,44 @@ public static SqlCoordinatorContext build(NereidsPlanner planner, NereidsSqlCoor .collect(Collectors.toList()) ); return new SqlCoordinatorContext( - coordinator, connectContext, planner, executionProfile, queryGlobals, queryOptions, descriptorTable, - coordinatorAddress, directConnectFrontendAddress + coordinator, connectContext, planner.isBlockQuery(), + planner.getDistributedPlans().valueList(), + planner.getFragments(), planner.getRuntimeFilters(), planner.getTopnFilters(), + planner.getScanNodes(), executionProfile, queryGlobals, queryOptions, descriptorTable ); } + public static SqlCoordinatorContext buildForLoad( + NereidsSqlCoordinator coordinator, + long jobId, TUniqueId queryId, + List fragments, + List distributedPlans, + List scanNodes, + DescriptorTable descTable, + String timezone, boolean loadZeroTolerance, + boolean enableProfile) { + TQueryOptions queryOptions = new TQueryOptions(); + queryOptions.setEnableProfile(enableProfile); + queryOptions.setBeExecVersion(Config.be_exec_version); + + TQueryGlobals queryGlobals = new TQueryGlobals(); + queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now())); + queryGlobals.setTimestampMs(System.currentTimeMillis()); + queryGlobals.setTimeZone(timezone); + queryGlobals.setLoadZeroTolerance(loadZeroTolerance); + + ExecutionProfile executionProfile = new ExecutionProfile( + queryId, + fragments.stream() + .map(fragment -> fragment.getFragmentId().asInt()) + .collect(Collectors.toList()) + ); + + return new SqlCoordinatorContext(coordinator, jobId, fragments, distributedPlans, + scanNodes, queryId, queryOptions, queryGlobals, descTable.toThrift(), + executionProfile); + } + private static TQueryOptions initQueryOptions(ConnectContext context) { TQueryOptions queryOptions = context.getSessionVariable().toThrift(); queryOptions.setBeExecVersion(Config.be_exec_version); @@ -307,7 +372,7 @@ private static void setOptionsFromUserProperty(ConnectContext connectContext, TQ private Set getInstanceIds() { Set instanceIds = Sets.newLinkedHashSet(); - for (DistributedPlan distributedPlan : planner.getDistributedPlans().valueList()) { + for (DistributedPlan distributedPlan : distributedPlans) { PipelineDistributedPlan pipelinePlan = (PipelineDistributedPlan) distributedPlan; List instanceJobs = pipelinePlan.getInstanceJobs(); for (AssignedJob instanceJob : instanceJobs) { @@ -317,9 +382,21 @@ private Set getInstanceIds() { return instanceIds; } + private Integer computeInstanceNum() { + return distributedPlans + .stream() + .map(plan -> plan.getInstanceJobs().size()) + .reduce(Integer::sum) + .get(); + } + + private long computeTimeoutDeadline() { + return System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L; + } + private Map getBackends() { Map backends = Maps.newLinkedHashMap(); - for (DistributedPlan distributedPlan : planner.getDistributedPlans().valueList()) { + for (DistributedPlan distributedPlan : distributedPlans) { PipelineDistributedPlan pipelinePlan = (PipelineDistributedPlan) distributedPlan; List instanceJobs = pipelinePlan.getInstanceJobs(); for (AssignedJob instanceJob : instanceJobs) { @@ -330,52 +407,19 @@ private Map getBackends() { return backends; } - private Set getScanNodes() { - Set scanNodes = Sets.newLinkedHashSet(); - - for (DistributedPlan distributedPlan : planner.getDistributedPlans().valueList()) { - PipelineDistributedPlan pipelinePlan = (PipelineDistributedPlan) distributedPlan; - List instanceJobs = pipelinePlan.getInstanceJobs(); - for (AssignedJob instanceJob : instanceJobs) { - ScanSource scanSource = instanceJob.getScanSource(); - if (scanSource instanceof DefaultScanSource) { - for (ScanNode scanNode : ((DefaultScanSource) scanSource).scanNodeToScanRanges.keySet()) { - scanNodes.add(scanNode); - } - } else { - BucketScanSource bucketScanSource = (BucketScanSource) scanSource; - for (Map scanNodeToRanges - : bucketScanSource.bucketIndexToScanNodeToTablets.values()) { - scanNodes.addAll(scanNodeToRanges.keySet()); - } - } - } + private TNetworkAddress computeDirectConnectCoordinator() { + if (connectContext != null && connectContext.isProxy() + && !StringUtils.isEmpty(connectContext.getCurrentConnectedFEIp())) { + return new TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(), Config.rpc_port); + } else { + return coordinatorAddress; } - return scanNodes; } private int getScanRangeNum() { int scanRangeNum = 0; - - for (DistributedPlan distributedPlan : planner.getDistributedPlans().valueList()) { - PipelineDistributedPlan pipelinePlan = (PipelineDistributedPlan) distributedPlan; - List instanceJobs = pipelinePlan.getInstanceJobs(); - for (AssignedJob instanceJob : instanceJobs) { - ScanSource scanSource = instanceJob.getScanSource(); - if (scanSource instanceof DefaultScanSource) { - for (ScanRanges scanRanges : ((DefaultScanSource) scanSource).scanNodeToScanRanges.values()) { - scanRangeNum += scanRanges.params.size(); - } - } else { - BucketScanSource bucketScanSource = (BucketScanSource) scanSource; - for (Map scanNodeToRanges - : bucketScanSource.bucketIndexToScanNodeToTablets.values()) { - for (ScanRanges scanRanges : scanNodeToRanges.values()) { - scanRangeNum += scanRanges.params.size(); - } - } - } - } + for (ScanNode scanNode : scanNodes) { + scanRangeNum += scanNode.getScanRangeNum(); } return scanRangeNum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java index 4a7bb93107ec5b..17151d28ee5a2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/LoadProcessor.java @@ -101,9 +101,7 @@ public void setSqlPipelineTask(SqlPipelineTask sqlPipelineTask) { } this.latch = Optional.of(latch); - int topFragmentId = coordinatorContext.planner - .getDistributedPlans() - .last() + int topFragmentId = coordinatorContext.topDistributedPlan .getFragmentJob() .getFragment() .getFragmentId() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java index a6adce618f00b0..766f2589d36fd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java @@ -20,7 +20,6 @@ import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; @@ -67,10 +66,7 @@ public QueryProcessor(SqlCoordinatorContext coordinatorContext, List fragments = coordinatorContext.planner.getDistributedPlans().valueList(); - this.limitRows = fragments.get(fragments.size() - 1) - .getFragmentJob() - .getFragment() + this.limitRows = coordinatorContext.fragments.get(coordinatorContext.fragments.size() - 1) .getPlanRoot() .getLimit(); @@ -78,10 +74,7 @@ public QueryProcessor(SqlCoordinatorContext coordinatorContext, List distributedPlans = coordinatorContext.planner.getDistributedPlans().valueList(); - PipelineDistributedPlan topFragment = - (PipelineDistributedPlan) distributedPlans.get(distributedPlans.size() - 1); - + PipelineDistributedPlan topFragment = coordinatorContext.topDistributedPlan; DataSink topDataSink = coordinatorContext.dataSink; Boolean enableParallelResultSink; if (topDataSink instanceof ResultSink) { @@ -106,11 +99,8 @@ public static QueryProcessor build(SqlCoordinatorContext coordinatorContext) { topInstance.instanceId(), topWorker.id(), execBeAddr, - coordinatorContext.timeoutDeadline, - coordinatorContext.planner.getCascadesContext() - .getConnectContext() - .getSessionVariable() - .getMaxMsgSizeOfResultReceiver(), + coordinatorContext.timeoutDeadline.get(), + coordinatorContext.connectContext.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink ) ); @@ -167,8 +157,8 @@ public RowBatch getNext() throws UserException, TException, RpcException { // if this query is a block query do not cancel. boolean hasLimit = limitRows > 0; - if (!coordinatorContext.planner.isBlockQuery() - && coordinatorContext.instanceNum > 1 + if (!coordinatorContext.isBlockQuery + && coordinatorContext.instanceNum.get() > 1 && hasLimit && numReceivedRows >= limitRows) { if (LOG.isDebugEnabled()) { LOG.debug("no block query, return num >= limit rows, need cancel"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java index 2776a3db41a104..42cf08fb2e3b18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java @@ -17,7 +17,6 @@ package org.apache.doris.qe.runtime; -import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; @@ -112,13 +111,12 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam } public static RuntimeFiltersThriftBuilder compute( - NereidsPlanner planner, List distributedPlans) { + List runtimeFilters, List distributedPlans) { PipelineDistributedPlan topMostPlan = distributedPlans.get(distributedPlans.size() - 1); AssignedJob mergeInstance = topMostPlan.getInstanceJobs().get(0); BackendWorker worker = (BackendWorker) mergeInstance.getAssignedWorker(); TNetworkAddress mergeAddress = new TNetworkAddress(worker.host(), worker.brpcPort()); - List runtimeFilters = planner.getRuntimeFilters(); Set broadcastRuntimeFilterIds = runtimeFilters .stream() .filter(RuntimeFilter::isBroadcast) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/SqlPipelineTask.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/SqlPipelineTask.java index 12b49a36dc9b82..8cf3f58cfaa2a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/SqlPipelineTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/SqlPipelineTask.java @@ -74,7 +74,7 @@ public SqlPipelineTask( super(new ChildrenRuntimeTasks<>(fragmentTasks)); this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null"); this.backendServiceProxy = Objects.requireNonNull(backendServiceProxy, "backendServiceProxy can not be null"); - this.timeoutDeadline = coordinatorContext.timeoutDeadline; + this.timeoutDeadline = coordinatorContext.timeoutDeadline.get(); // flatten to fragment tasks to quickly index by BackendFragmentId, when receive the report message ImmutableMap.Builder backendFragmentTasks @@ -95,7 +95,7 @@ public SqlPipelineTask( public void execute() throws Exception { coordinatorContext.withLock(() -> { sendAndWaitPhaseOneRpc(); - if (coordinatorContext.twoPhaseExecution) { + if (coordinatorContext.twoPhaseExecution()) { sendAndWaitPhaseTwoRpc(); } return null; @@ -118,7 +118,7 @@ private void sendAndWaitPhaseOneRpc() throws UserException, RpcException { rpcs.add(new RpcInfo( fragmentsTask, DateTime.now().getMillis(), - fragmentsTask.sendPhaseOneRpc(coordinatorContext.twoPhaseExecution)) + fragmentsTask.sendPhaseOneRpc(coordinatorContext.twoPhaseExecution())) ); } Map> rpcPhase1Latency = waitPipelineRpc(rpcs, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 1602d1272bd171..0b578266a7af7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -67,6 +67,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -79,21 +80,17 @@ public class ThriftPlansBuilder { private static final Logger LOG = LogManager.getLogger(ThriftPlansBuilder.class); public static Map plansToThrift( - SqlCoordinatorContext coordinatorContext) { - List distributedPlans = coordinatorContext.planner.getDistributedPlans().valueList(); + SqlCoordinatorContext sqlCoordinatorContext) { - // we should set runtime predicate first, then we can use heap sort and to thrift - setRuntimePredicateIfNeed(coordinatorContext); + List distributedPlans = sqlCoordinatorContext.distributedPlans; - return plansToThrift(distributedPlans, coordinatorContext); - } - - private static Map plansToThrift( - List distributedPlans, SqlCoordinatorContext coordinatorContext) { + // we should set runtime predicate first, then we can use heap sort and to thrift + setRuntimePredicateIfNeed(sqlCoordinatorContext.scanNodes); - RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder - = RuntimeFiltersThriftBuilder.compute(coordinatorContext.planner, distributedPlans); - Supplier> topNFilterThriftSupplier = topNFilterToThrift(coordinatorContext); + RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder = RuntimeFiltersThriftBuilder.compute( + sqlCoordinatorContext.runtimeFilters, distributedPlans); + Supplier> topNFilterThriftSupplier + = topNFilterToThrift(sqlCoordinatorContext.topnFilters); Multiset workerProcessInstanceNum = computeInstanceNumPerWorker(distributedPlans); Map fragmentsGroupByWorker = Maps.newLinkedHashMap(); @@ -111,7 +108,7 @@ private static Map plansToTh TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent( currentFragmentPlan, instanceJob, workerToCurrentFragment, instancesPerWorker, exchangeSenderNum, sharedFileScanRangeParams, - workerProcessInstanceNum, coordinatorContext); + workerProcessInstanceNum, sqlCoordinatorContext); TPipelineInstanceParams instanceParam = instanceToThrift( currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder, @@ -165,8 +162,8 @@ private static Map plansToTh return workerToInstances; } - private static void setRuntimePredicateIfNeed(SqlCoordinatorContext coordinatorContext) { - for (ScanNode scanNode : coordinatorContext.planner.getScanNodes()) { + private static void setRuntimePredicateIfNeed(Collection scanNodes) { + for (ScanNode scanNode : scanNodes) { if (scanNode instanceof OlapScanNode) { for (SortNode topnFilterSortNode : scanNode.getTopnFilterSortNodes()) { topnFilterSortNode.setHasRuntimePredicate(); @@ -175,9 +172,8 @@ private static void setRuntimePredicateIfNeed(SqlCoordinatorContext coordinatorC } } - private static Supplier> topNFilterToThrift(SqlCoordinatorContext coordinatorContext) { + private static Supplier> topNFilterToThrift(List topnFilters) { return Suppliers.memoize(() -> { - List topnFilters = coordinatorContext.planner.getTopnFilters(); if (CollectionUtils.isEmpty(topnFilters)) { return null; } @@ -315,7 +311,7 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent( // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. params.setFragmentNumOnHost(workerProcessInstanceNum.count(worker)); - params.setNeedWaitExecutionTrigger(coordinatorContext.twoPhaseExecution); + params.setNeedWaitExecutionTrigger(coordinatorContext.twoPhaseExecution()); params.setPerExchNumSenders(exchangeSenderNum); List nonMultiCastDestinations; @@ -332,12 +328,14 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent( params.setTotalInstances(instanceNumInThisFragment); params.setCoord(coordinatorContext.coordinatorAddress); - params.setCurrentConnectFe(coordinatorContext.directConnectFrontendAddress); + params.setCurrentConnectFe(coordinatorContext.directConnectFrontendAddress.get()); params.setQueryGlobals(coordinatorContext.queryGlobals); params.setQueryOptions(new TQueryOptions(coordinatorContext.queryOptions)); long memLimit = coordinatorContext.queryOptions.getMemLimit(); // update memory limit for colocate join - if (!connectContext.getSessionVariable().isDisableColocatePlan() && fragment.hasColocatePlanNode()) { + if (connectContext != null + && !connectContext.getSessionVariable().isDisableColocatePlan() + && fragment.hasColocatePlanNode()) { int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNumInThisFragment); memLimit = coordinatorContext.queryOptions.getMemLimit() / rate; } @@ -364,6 +362,7 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent( List instances = instancesPerWorker.get(worker); Map instanceToIndex = instanceToIndex(instances); + // local shuffle params: bucket_seq_to_instance_idx and shuffle_idx_to_instance_idx params.setBucketSeqToInstanceIdx(computeBucketIdToInstanceId(fragmentPlan, w, instanceToIndex)); params.setShuffleIdxToInstanceIdx(computeDestIdToInstanceId(fragmentPlan, w, instanceToIndex)); return params;