diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7c5aa8db0a77af..f419f58037aab6 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -268,7 +268,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - size_t raw_bytes_read = 0; bool first_read = true; + size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit(); while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; @@ -316,6 +316,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } + if (limit > 0 && limit < ctx->batch_size()) { + // If this scanner has limit, and less than batch size, + // return immediately and no need to wait raw_bytes_threshold. + // This can save time that each scanner may only return a small number of rows, + // but rows are enough from all scanners. + // If not break, the query like "select * from tbl where id=1 limit 10" + // may scan a lot data when the "id=1"'s filter ratio is high. + // If limit is larger than batch size, this rule is skipped, + // to avoid user specify a large limit and causing too much small blocks. + break; + } } // end for while if (UNLIKELY(!status.ok())) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6c4f3294ce1bcc..bb68055e1f07a3 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -156,6 +156,8 @@ class VScanner { _query_statistics = query_statistics; } + int64_t limit() const { return _limit; } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 58af5cd3e92199..16fe1353facfb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -722,11 +722,6 @@ public String getExplainString(ExplainOptions explainOptions) { return plan; } - @Override - public boolean isBlockQuery() { - return true; - } - @Override public DescriptorTable getDescTable() { return descTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 503cb181766113..feb8c45787a013 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -79,10 +79,6 @@ public OriginalPlanner(Analyzer analyzer) { this.analyzer = analyzer; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public PlannerContext getPlannerContext() { return plannerContext; } @@ -276,17 +272,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; - if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { - isBlockQuery = true; - if (LOG.isDebugEnabled()) { - LOG.debug("this is block query"); - } - } else { - isBlockQuery = false; - if (LOG.isDebugEnabled()) { - LOG.debug("this isn't block query"); - } - } if (selectStmt.isTwoPhaseReadOptEnabled()) { // Optimize query like `SELECT ... FROM WHERE ... ORDER BY ... LIMIT ...` if (singleNodePlan instanceof SortNode diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 5617ad57e8f66e..cfcd27af8fa5d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -44,8 +44,6 @@ public abstract class Planner { protected ArrayList fragments = Lists.newArrayList(); - protected boolean isBlockQuery = false; - protected TQueryOptions queryOptions; public abstract List getScanNodes(); @@ -116,10 +114,6 @@ public List getFragments() { return fragments; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public TQueryOptions getQueryOptions() { return queryOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 262b5836689ab0..4905050e6e8cfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -235,8 +235,6 @@ public class Coordinator implements CoordInterface { // same as backend_exec_states_.size() after Exec() private final Set instanceIds = Sets.newHashSet(); - private final boolean isBlockQuery; - private int numReceivedRows = 0; private List deltaUrls; @@ -332,7 +330,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner, // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.context = context; - this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); this.fragments = planner.getFragments(); this.scanNodes = planner.getScanNodes(); @@ -375,7 +372,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { // Constructor of Coordinator is too complicated. public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, List scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) { - this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; this.descTable = descTable.toThrift(); @@ -1202,23 +1198,22 @@ public RowBatch getNext() throws Exception { numReceivedRows += resultBatch.getBatch().getRowsSize(); } + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + // ATTN: if change here, also need to change the same logic in QueryProcessor.getNext(); + Long limitRows = fragments.get(0).getPlanRoot().getLimit(); + boolean reachedLimit = LimitUtils.cancelIfReachLimit( + resultBatch, limitRows, numReceivedRows, this::cancelInternal); + if (resultBatch.isEos()) { receivers.remove(receiver); if (receivers.isEmpty()) { returnedAllResults = true; - } else { + } else if (!reachedLimit) { + // if reachedLimit is true, which means this query has been cancelled. + // so no need to set eos to false again. resultBatch.setEos(false); } - - // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); - boolean hasLimit = numLimitRows > 0; - if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { - if (LOG.isDebugEnabled()) { - LOG.debug("no block query, return num >= limit rows, need cancel"); - } - cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query reach limit")); - } } if (!returnedAllResults) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java index aed0fd9c98ce71..343456c296a79b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java @@ -84,7 +84,6 @@ public class CoordinatorContext { // these are some constant parameters public final NereidsCoordinator coordinator; public final List fragments; - public final boolean isBlockQuery; public final DataSink dataSink; public final ExecutionProfile executionProfile; public final ConnectContext connectContext; @@ -120,7 +119,6 @@ public class CoordinatorContext { private CoordinatorContext( NereidsCoordinator coordinator, ConnectContext connectContext, - boolean isBlockQuery, List distributedPlans, List fragments, List runtimeFilters, @@ -131,7 +129,6 @@ private CoordinatorContext( TQueryOptions queryOptions, TDescriptorTable descriptorTable) { this.connectContext = connectContext; - this.isBlockQuery = isBlockQuery; this.fragments = fragments; this.distributedPlans = distributedPlans; this.topDistributedPlan = distributedPlans.get(distributedPlans.size() - 1); @@ -161,7 +158,6 @@ private CoordinatorContext( TDescriptorTable descriptorTable, ExecutionProfile executionProfile) { this.coordinator = coordinator; - this.isBlockQuery = true; this.fragments = fragments; this.distributedPlans = distributedPlans; this.topDistributedPlan = distributedPlans.get(distributedPlans.size() - 1); @@ -290,7 +286,7 @@ public static CoordinatorContext buildForSql(NereidsPlanner planner, NereidsCoor .collect(Collectors.toList()) ); return new CoordinatorContext( - coordinator, connectContext, planner.isBlockQuery(), + coordinator, connectContext, planner.getDistributedPlans().valueList(), planner.getFragments(), planner.getRuntimeFilters(), planner.getTopnFilters(), planner.getScanNodes(), executionProfile, queryGlobals, queryOptions, descriptorTable diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java new file mode 100644 index 00000000000000..cbbe5c71a0ff06 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.Status; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Consumer; + +/** + * This is a utility class for limit related operations. + * Because current there are 2 places need to check limit rows, so put the logic here for unification. + * - Coordinator.getNext(); + * - QueryProcessor.getNext(); + */ +public class LimitUtils { + private static final Logger LOG = LogManager.getLogger(LimitUtils.class); + private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit"); + + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows, + Consumer cancelFunc) { + boolean reachedLimit = false; + if (limitRows > 0 && numReceivedRows >= limitRows) { + if (LOG.isDebugEnabled()) { + LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows); + } + cancelFunc.accept(LIMIT_REACH_STATUS); + // set this + resultBatch.setEos(true); + reachedLimit = true; + } + return reachedLimit; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/MultiFragmentsPipelineTask.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/MultiFragmentsPipelineTask.java index a1a91aab5176b8..92b2a00597b2e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/MultiFragmentsPipelineTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/MultiFragmentsPipelineTask.java @@ -130,9 +130,10 @@ public void onSuccess(InternalService.PCancelPlanFragmentResult result) { LOG.warn("Failed to cancel query {} backend: {}, reason: {}", DebugUtil.printId(queryId), backend, status.toString()); } + } else { + LOG.warn("Failed to cancel query {} backend: {} reason: {}", + DebugUtil.printId(queryId), backend, "without status"); } - LOG.warn("Failed to cancel query {} backend: {} reason: {}", - DebugUtil.printId(queryId), backend, "without status"); } public void onFailure(Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java index a5a5100faece1a..6eb19d250ede4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java @@ -28,12 +28,12 @@ import org.apache.doris.qe.AbstractJobProcessor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.CoordinatorContext; +import org.apache.doris.qe.LimitUtils; import org.apache.doris.qe.ResultReceiver; import org.apache.doris.qe.RowBatch; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TReportExecStatusParams; -import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -64,7 +64,7 @@ public QueryProcessor(CoordinatorContext coordinatorContext, List 0; - if (!coordinatorContext.isBlockQuery - && coordinatorContext.instanceNum.get() > 1 - && hasLimit && numReceivedRows >= limitRows) { - if (LOG.isDebugEnabled()) { - LOG.debug("no block query, return num >= limit rows, need cancel"); - } - coordinatorContext.cancelSchedule(new Status(TStatusCode.LIMIT_REACH, "query reach limit")); - } } if (!runningReceivers.isEmpty()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java new file mode 100644 index 00000000000000..012fbad18a5ddb --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + + +import org.apache.doris.common.Status; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.Consumer; + +public class LimitUtilsTest { + + private static int res = 0; + + @Test + public void testUpperBound() { + Consumer cancelFunc = batch -> res = 666; + RowBatch rowBatch = new RowBatch(); + rowBatch.setEos(false); + // - no limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - not reach limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - reach limit + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + + // - reach limit + res = 0; + rowBatch.setEos(false); + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + } +}