Skip to content

Commit

Permalink
[fix](planner) query should be cancelled if limit reached (#44338) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored Dec 10, 2024
1 parent 4100a75 commit 49d1671
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 42 deletions.
13 changes: 12 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}

size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0; bool first_read = true;
size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
Expand Down Expand Up @@ -322,6 +322,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (limit > 0 && limit < ctx->batch_size()) {
// If this scanner has limit, and less than batch size,
// return immediately and no need to wait raw_bytes_threshold.
// This can save time that each scanner may only return a small number of rows,
// but rows are enough from all scanners.
// If not break, the query like "select * from tbl where id=1 limit 10"
// may scan a lot data when the "id=1"'s filter ratio is high.
// If limit is larger than batch size, this rule is skipped,
// to avoid user specify a large limit and causing too much small blocks.
break;
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class VScanner {
_query_statistics = query_statistics;
}

int64_t limit() const { return _limit; }

protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,6 @@ public String getExplainString(ExplainOptions explainOptions) {
return plan;
}

@Override
public boolean isBlockQuery() {
return true;
}

@Override
public DescriptorTable getDescTable() {
return descTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ public OriginalPlanner(Analyzer analyzer) {
this.analyzer = analyzer;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public PlannerContext getPlannerContext() {
return plannerContext;
}
Expand Down Expand Up @@ -276,17 +272,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue

if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
isBlockQuery = true;
if (LOG.isDebugEnabled()) {
LOG.debug("this is block query");
}
} else {
isBlockQuery = false;
if (LOG.isDebugEnabled()) {
LOG.debug("this isn't block query");
}
}
if (selectStmt.isTwoPhaseReadOptEnabled()) {
// Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...`
if (singleNodePlan instanceof SortNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public abstract class Planner {

protected ArrayList<PlanFragment> fragments = Lists.newArrayList();

protected boolean isBlockQuery = false;

protected TQueryOptions queryOptions;

public abstract List<ScanNode> getScanNodes();
Expand Down Expand Up @@ -116,10 +114,6 @@ public List<PlanFragment> getFragments() {
return fragments;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public TQueryOptions getQueryOptions() {
return queryOptions;
}
Expand Down
25 changes: 10 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();

private final boolean isBlockQuery;

private int numReceivedRows = 0;

private List<String> deltaUrls;
Expand Down Expand Up @@ -331,7 +329,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
Expand Down Expand Up @@ -374,7 +371,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand Down Expand Up @@ -1206,23 +1202,22 @@ public RowBatch getNext() throws Exception {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}

// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
// ATTN: if change here, also need to change the same logic in QueryProcessor.getNext();
Long limitRows = fragments.get(0).getPlanRoot().getLimit();
boolean reachedLimit = LimitUtils.cancelIfReachLimit(
resultBatch, limitRows, numReceivedRows, this::cancelInternal);

if (resultBatch.isEos()) {
receivers.remove(receiver);
if (receivers.isEmpty()) {
returnedAllResults = true;
} else {
} else if (!reachedLimit) {
// if reachedLimit is true, which means this query has been cancelled.
// so no need to set eos to false again.
resultBatch.setEos(false);
}

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query reach limit"));
}
}

if (!returnedAllResults) {
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.common.Status;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Consumer;

/**
* This is a utility class for limit related operations.
* Because current there are 2 places need to check limit rows, so put the logic here for unification.
* - Coordinator.getNext();
* - QueryProcessor.getNext();
*/
public class LimitUtils {
private static final Logger LOG = LogManager.getLogger(LimitUtils.class);
private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit");

// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows,
Consumer<Status> cancelFunc) {
boolean reachedLimit = false;
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows);
}
cancelFunc.accept(LIMIT_REACH_STATUS);
// set this
resultBatch.setEos(true);
reachedLimit = true;
}
return reachedLimit;
}
}
59 changes: 59 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;


import org.apache.doris.common.Status;

import org.junit.Assert;
import org.junit.Test;

import java.util.function.Consumer;

public class LimitUtilsTest {

private static int res = 0;

@Test
public void testUpperBound() {
Consumer<Status> cancelFunc = batch -> res = 666;
RowBatch rowBatch = new RowBatch();
rowBatch.setEos(false);
// - no limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - not reach limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - reach limit
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);

// - reach limit
res = 0;
rowBatch.setEos(false);
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);
}
}

0 comments on commit 49d1671

Please sign in to comment.