diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 385b581d2a5725..3750d8b40b40e2 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -268,7 +268,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - size_t raw_bytes_read = 0; bool first_read = true; + size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit(); while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; @@ -322,6 +322,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } + if (limit > 0 && limit < ctx->batch_size()) { + // If this scanner has limit, and less than batch size, + // return immediately and no need to wait raw_bytes_threshold. + // This can save time that each scanner may only return a small number of rows, + // but rows are enough from all scanners. + // If not break, the query like "select * from tbl where id=1 limit 10" + // may scan a lot data when the "id=1"'s filter ratio is high. + // If limit is larger than batch size, this rule is skipped, + // to avoid user specify a large limit and causing too much small blocks. + break; + } } // end for while if (UNLIKELY(!status.ok())) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6c4f3294ce1bcc..bb68055e1f07a3 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -156,6 +156,8 @@ class VScanner { _query_statistics = query_statistics; } + int64_t limit() const { return _limit; } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index d365ff912de334..c9985911670878 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -719,11 +719,6 @@ public String getExplainString(ExplainOptions explainOptions) { return plan; } - @Override - public boolean isBlockQuery() { - return true; - } - @Override public DescriptorTable getDescTable() { return descTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index c8ef2e22662b4a..f78b0735a1e83f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -79,10 +79,6 @@ public OriginalPlanner(Analyzer analyzer) { this.analyzer = analyzer; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public PlannerContext getPlannerContext() { return plannerContext; } @@ -276,17 +272,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; - if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { - isBlockQuery = true; - if (LOG.isDebugEnabled()) { - LOG.debug("this is block query"); - } - } else { - isBlockQuery = false; - if (LOG.isDebugEnabled()) { - LOG.debug("this isn't block query"); - } - } if (selectStmt.isTwoPhaseReadOptEnabled()) { // Optimize query like `SELECT ... FROM WHERE ... ORDER BY ... LIMIT ...` if (singleNodePlan instanceof SortNode diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 5617ad57e8f66e..cfcd27af8fa5d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -44,8 +44,6 @@ public abstract class Planner { protected ArrayList fragments = Lists.newArrayList(); - protected boolean isBlockQuery = false; - protected TQueryOptions queryOptions; public abstract List getScanNodes(); @@ -116,10 +114,6 @@ public List getFragments() { return fragments; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public TQueryOptions getQueryOptions() { return queryOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 78493a46ad192a..0f6bc0212d1f89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface { // same as backend_exec_states_.size() after Exec() private final Set instanceIds = Sets.newHashSet(); - private final boolean isBlockQuery; - private int numReceivedRows = 0; private List deltaUrls; @@ -331,7 +329,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner, // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.context = context; - this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); this.fragments = planner.getFragments(); this.scanNodes = planner.getScanNodes(); @@ -374,7 +371,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { // Constructor of Coordinator is too complicated. public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List fragments, List scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) { - this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; this.descTable = descTable.toThrift(); @@ -1206,23 +1202,22 @@ public RowBatch getNext() throws Exception { numReceivedRows += resultBatch.getBatch().getRowsSize(); } + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + // ATTN: if change here, also need to change the same logic in QueryProcessor.getNext(); + Long limitRows = fragments.get(0).getPlanRoot().getLimit(); + boolean reachedLimit = LimitUtils.cancelIfReachLimit( + resultBatch, limitRows, numReceivedRows, this::cancelInternal); + if (resultBatch.isEos()) { receivers.remove(receiver); if (receivers.isEmpty()) { returnedAllResults = true; - } else { + } else if (!reachedLimit) { + // if reachedLimit is true, which means this query has been cancelled. + // so no need to set eos to false again. resultBatch.setEos(false); } - - // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); - boolean hasLimit = numLimitRows > 0; - if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { - if (LOG.isDebugEnabled()) { - LOG.debug("no block query, return num >= limit rows, need cancel"); - } - cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query reach limit")); - } } if (!returnedAllResults) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java new file mode 100644 index 00000000000000..cbbe5c71a0ff06 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.Status; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Consumer; + +/** + * This is a utility class for limit related operations. + * Because current there are 2 places need to check limit rows, so put the logic here for unification. + * - Coordinator.getNext(); + * - QueryProcessor.getNext(); + */ +public class LimitUtils { + private static final Logger LOG = LogManager.getLogger(LimitUtils.class); + private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit"); + + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows, + Consumer cancelFunc) { + boolean reachedLimit = false; + if (limitRows > 0 && numReceivedRows >= limitRows) { + if (LOG.isDebugEnabled()) { + LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows); + } + cancelFunc.accept(LIMIT_REACH_STATUS); + // set this + resultBatch.setEos(true); + reachedLimit = true; + } + return reachedLimit; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java new file mode 100644 index 00000000000000..012fbad18a5ddb --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + + +import org.apache.doris.common.Status; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.Consumer; + +public class LimitUtilsTest { + + private static int res = 0; + + @Test + public void testUpperBound() { + Consumer cancelFunc = batch -> res = 666; + RowBatch rowBatch = new RowBatch(); + rowBatch.setEos(false); + // - no limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - not reach limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - reach limit + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + + // - reach limit + res = 0; + rowBatch.setEos(false); + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + } +}