Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](planner) query should be cancelled if limit reached (#44338) #45223

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}

size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0; bool first_read = true;
size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
Expand Down Expand Up @@ -322,6 +322,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (limit > 0 && limit < ctx->batch_size()) {
// If this scanner has limit, and less than batch size,
// return immediately and no need to wait raw_bytes_threshold.
// This can save time that each scanner may only return a small number of rows,
// but rows are enough from all scanners.
// If not break, the query like "select * from tbl where id=1 limit 10"
// may scan a lot data when the "id=1"'s filter ratio is high.
// If limit is larger than batch size, this rule is skipped,
// to avoid user specify a large limit and causing too much small blocks.
break;
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class VScanner {
_query_statistics = query_statistics;
}

int64_t limit() const { return _limit; }

protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,6 @@ public String getExplainString(ExplainOptions explainOptions) {
return plan;
}

@Override
public boolean isBlockQuery() {
return true;
}

@Override
public DescriptorTable getDescTable() {
return descTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ public OriginalPlanner(Analyzer analyzer) {
this.analyzer = analyzer;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public PlannerContext getPlannerContext() {
return plannerContext;
}
Expand Down Expand Up @@ -276,17 +272,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue

if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
isBlockQuery = true;
if (LOG.isDebugEnabled()) {
LOG.debug("this is block query");
}
} else {
isBlockQuery = false;
if (LOG.isDebugEnabled()) {
LOG.debug("this isn't block query");
}
}
if (selectStmt.isTwoPhaseReadOptEnabled()) {
// Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...`
if (singleNodePlan instanceof SortNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public abstract class Planner {

protected ArrayList<PlanFragment> fragments = Lists.newArrayList();

protected boolean isBlockQuery = false;

protected TQueryOptions queryOptions;

public abstract List<ScanNode> getScanNodes();
Expand Down Expand Up @@ -116,10 +114,6 @@ public List<PlanFragment> getFragments() {
return fragments;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public TQueryOptions getQueryOptions() {
return queryOptions;
}
Expand Down
25 changes: 10 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();

private final boolean isBlockQuery;

private int numReceivedRows = 0;

private List<String> deltaUrls;
Expand Down Expand Up @@ -331,7 +329,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
Expand Down Expand Up @@ -374,7 +371,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand Down Expand Up @@ -1206,23 +1202,22 @@ public RowBatch getNext() throws Exception {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}

// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
// ATTN: if change here, also need to change the same logic in QueryProcessor.getNext();
Long limitRows = fragments.get(0).getPlanRoot().getLimit();
boolean reachedLimit = LimitUtils.cancelIfReachLimit(
resultBatch, limitRows, numReceivedRows, this::cancelInternal);

if (resultBatch.isEos()) {
receivers.remove(receiver);
if (receivers.isEmpty()) {
returnedAllResults = true;
} else {
} else if (!reachedLimit) {
// if reachedLimit is true, which means this query has been cancelled.
// so no need to set eos to false again.
resultBatch.setEos(false);
}

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query reach limit"));
}
}

if (!returnedAllResults) {
Expand Down
54 changes: 54 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.common.Status;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Consumer;

/**
* This is a utility class for limit related operations.
* Because current there are 2 places need to check limit rows, so put the logic here for unification.
* - Coordinator.getNext();
* - QueryProcessor.getNext();
*/
public class LimitUtils {
private static final Logger LOG = LogManager.getLogger(LimitUtils.class);
private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit");

// if reached limit rows, cancel this query immediately
// to avoid BE from reading more data.
public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows,
Consumer<Status> cancelFunc) {
boolean reachedLimit = false;
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows);
}
cancelFunc.accept(LIMIT_REACH_STATUS);
// set this
resultBatch.setEos(true);
reachedLimit = true;
}
return reachedLimit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;


import org.apache.doris.common.Status;

import org.junit.Assert;
import org.junit.Test;

import java.util.function.Consumer;

public class LimitUtilsTest {

private static int res = 0;

@Test
public void testUpperBound() {
Consumer<Status> cancelFunc = batch -> res = 666;
RowBatch rowBatch = new RowBatch();
rowBatch.setEos(false);
// - no limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - not reach limit
Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc));
Assert.assertFalse(rowBatch.isEos());
Assert.assertEquals(0, res);

// - reach limit
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);

// - reach limit
res = 0;
rowBatch.setEos(false);
Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc));
Assert.assertTrue(rowBatch.isEos());
Assert.assertEquals(666, res);
}
}
Loading