Skip to content

Commit

Permalink
[Enhancement] Add big query error in audit log (#49611)
Browse files Browse the repository at this point in the history
Why I'm doing:
When exceeding the thredsholds of big_query_cpu_second_limit or big_query_scan_rows_limit, the query will be failed.
However, we don't know a query is killed by which big query thredsholds or other reasons.

What I'm doing:
Add BIG_QUERY_CPU_SECOND_LIMIT_EXCEEDED and BIG_QUERY_SCAN_ROWS_LIMIT_EXCEEDED to TStatusCode.

Signed-off-by: zihe.liu <[email protected]>
  • Loading branch information
ZiheLiu authored Aug 9, 2024
1 parent 7bf9d6c commit 2e68c7e
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 21 deletions.
4 changes: 4 additions & 0 deletions be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ std::string Status::code_as_string() const {
return "Capaticy limit exceeded";
case TStatusCode::SHUTDOWN:
return "Shut down in progress";
case TStatusCode::BIG_QUERY_CPU_SECOND_LIMIT_EXCEEDED:
return "Big query cpu second limit exceeded";
case TStatusCode::BIG_QUERY_SCAN_ROWS_LIMIT_EXCEEDED:
return "Big query scan rows limit exceeded";
}
return {};
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ class STATUS_ATTRIBUTE Status {

static Status Shutdown(std::string_view msg) { return Status(TStatusCode::SHUTDOWN, msg); }

static Status BigQueryCpuSecondLimitExceeded(std::string_view msg) {
return Status(TStatusCode::BIG_QUERY_CPU_SECOND_LIMIT_EXCEEDED, msg);
}
static Status BigQueryScanRowsLimitExceeded(std::string_view msg) {
return Status(TStatusCode::BIG_QUERY_SCAN_ROWS_LIMIT_EXCEEDED, msg);
}

bool ok() const { return _state == nullptr; }

bool is_cancelled() const { return code() == TStatusCode::CANCELLED; }
Expand Down
10 changes: 6 additions & 4 deletions be/src/exec/workgroup/work_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ Status WorkGroup::check_big_query(const QueryContext& query_context) {
int64_t query_runtime_ns = query_context.cpu_cost();
if (query_runtime_ns > _big_query_cpu_nanos_limit) {
_bigquery_count++;
return Status::Cancelled(fmt::format("exceed big query cpu limit: current is {}ns but limit is {}ns",
query_runtime_ns, _big_query_cpu_nanos_limit));
return Status::BigQueryCpuSecondLimitExceeded(
fmt::format("exceed big query cpu limit: current is {}ns but limit is {}ns", query_runtime_ns,
_big_query_cpu_nanos_limit));
}
}

Expand All @@ -244,8 +245,9 @@ Status WorkGroup::check_big_query(const QueryContext& query_context) {
query_context.get_scan_limit() > 0 ? query_context.get_scan_limit() : _big_query_scan_rows_limit;
if (_big_query_scan_rows_limit && query_context.cur_scan_rows_num() > bigquery_scan_limit) {
_bigquery_count++;
return Status::Cancelled(fmt::format("exceed big query scan_rows limit: current is {} but limit is {}",
query_context.cur_scan_rows_num(), _big_query_scan_rows_limit));
return Status::BigQueryScanRowsLimitExceeded(
fmt::format("exceed big query scan_rows limit: current is {} but limit is {}",
query_context.cur_scan_rows_num(), _big_query_scan_rows_limit));
}

return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void processOnce() throws IOException {
ctx.setCommand(MysqlCommand.COM_QUERY);
ctx.setStartTime();
ctx.setResourceGroup(null);
ctx.setErrorCode("");
ctx.resetErrorCode();
this.handleQuery();
ctx.setStartTime();

Expand Down
20 changes: 16 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.starrocks.thrift.TWorkGroup;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -592,12 +593,23 @@ public void setState(QueryState state) {
this.state = state;
}

public String getErrorCode() {
return errorCode;
public String getNormalizedErrorCode() {
// TODO: how to unify TStatusCode, ErrorCode, ErrType, ConnectContext.errorCode
if (StringUtils.isNotEmpty(errorCode)) {
// error happens in BE execution.
return errorCode;
}

if (state.getErrType() != QueryState.ErrType.UNKNOWN) {
// error happens in FE execution.
return state.getErrType().name();
}

return "";
}

public void setErrorCode(String errorCode) {
this.errorCode = errorCode;
public void resetErrorCode() {
this.errorCode = "";
}

public void setErrorCodeOnce(String errorCode) {
Expand Down
14 changes: 2 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,9 @@ public void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStat
return;
}

// TODO how to unify TStatusCode, ErrorCode, ErrType, ConnectContext.errorCode
String errorCode = "";
if (ctx.getState().getErrType() != QueryState.ErrType.UNKNOWN) {
// error happens in FE execution.
errorCode = ctx.getState().getErrType().name();
} else if (StringUtils.isNotEmpty(ctx.getErrorCode())) {
// error happens in BE execution.
errorCode = ctx.getErrorCode();
}

ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setState(ctx.getState().toString())
.setErrorCode(errorCode)
.setErrorCode(ctx.getNormalizedErrorCode())
.setQueryTime(elapseMs)
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
Expand Down Expand Up @@ -548,7 +538,7 @@ private void dispatch() throws IOException {
ctx.setCommand(command);
ctx.setStartTime();
ctx.setResourceGroup(null);
ctx.setErrorCode("");
ctx.resetErrorCode();

switch (command) {
case COM_INIT_DB:
Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/ConnectContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@

package com.starrocks.qe;

import com.starrocks.common.Status;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.mysql.MysqlCapability;
import com.starrocks.mysql.MysqlChannel;
import com.starrocks.mysql.MysqlCommand;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.thrift.TStatus;
import com.starrocks.thrift.TStatusCode;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.warehouse.DefaultWarehouse;
import mockit.Expectations;
Expand Down Expand Up @@ -250,4 +253,22 @@ public void testWarehouse(@Mocked WarehouseManager warehouseManager) {
ctx.setCurrentWarehouseId(WarehouseManager.DEFAULT_WAREHOUSE_ID);
Assert.assertEquals(WarehouseManager.DEFAULT_WAREHOUSE_ID, ctx.getCurrentWarehouseId());
}

@Test
public void testGetNormalizedErrorCode() {
ConnectContext ctx = new ConnectContext(socketChannel);
ctx.setState(new QueryState());
Status status = new Status(new TStatus(TStatusCode.MEM_LIMIT_EXCEEDED));

{
ctx.setErrorCodeOnce(status.getErrorCodeString());
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
Assert.assertEquals("MEM_LIMIT_EXCEEDED", ctx.getNormalizedErrorCode());
}

{
ctx.resetErrorCode();
Assert.assertEquals("ANALYSIS_ERR", ctx.getNormalizedErrorCode());
}
}
}
3 changes: 3 additions & 0 deletions gensrc/thrift/StatusCode.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,8 @@ enum TStatusCode {
CAPACITY_LIMIT_EXCEED = 58,

SHUTDOWN = 59, // the service is shutting down

BIG_QUERY_CPU_SECOND_LIMIT_EXCEEDED = 60,
BIG_QUERY_SCAN_ROWS_LIMIT_EXCEEDED = 61,
}

63 changes: 63 additions & 0 deletions test/sql/test_resource_group/R/test_resource_group_big_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- name: test_resource_group_big_query
create table t1 (
k1 int
)
duplicate key(k1)
distributed by hash(k1) buckets 32
properties("replication_num" = "1");
-- result:
-- !result
insert into t1 select generate_series FROM TABLE(generate_series(1, 65535));
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
insert into t1 select k1 from t1;
-- result:
-- !result
create resource group rg_${uuid0}
to ( user='user_${uuid0}' )
with ('cpu_core_limit' = '1', 'mem_limit' = '0.99', 'big_query_cpu_second_limit'='1');
-- result:
-- !result
with w1 as (select * from t1 union all select * from t1
union all select * from t1 union all select * from t1
union all select * from t1 union all select * from t1
union all select * from t1 union all select * from t1)
select /*+SET_VAR(resource_group='rg_${uuid0}')*/ count(1) from w1;
-- result:
[REGEX].*exceed big query cpu limit: current is \d+ns but limit is 1000000000ns.*
-- !result
alter resource group rg_${uuid0} with ('big_query_cpu_second_limit'='0','big_query_scan_rows_limit'='1');
-- result:
-- !result
select /*+SET_VAR(resource_group='rg_${uuid0}')*/ count(1) from t1;
-- result:
[REGEX].*exceed big query scan_rows limit: current is \d+ but limit is 1.*
-- !result
select count(1) from t1;
-- result:
33553920
-- !result
38 changes: 38 additions & 0 deletions test/sql/test_resource_group/T/test_resource_group_big_query
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- name: test_resource_group_big_query
create table t1 (
k1 int
)
duplicate key(k1)
distributed by hash(k1) buckets 32
properties("replication_num" = "1");

insert into t1 select generate_series FROM TABLE(generate_series(1, 65535));
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;
insert into t1 select k1 from t1;

create resource group rg_${uuid0}
to ( user='user_${uuid0}' )
with ('cpu_core_limit' = '1', 'mem_limit' = '0.99', 'big_query_cpu_second_limit'='1');


with w1 as (select * from t1 union all select * from t1
union all select * from t1 union all select * from t1
union all select * from t1 union all select * from t1
union all select * from t1 union all select * from t1)
select /*+SET_VAR(resource_group='rg_${uuid0}')*/ count(1) from w1;


alter resource group rg_${uuid0} with ('big_query_cpu_second_limit'='0','big_query_scan_rows_limit'='1');
select /*+SET_VAR(resource_group='rg_${uuid0}')*/ count(1) from t1;


select count(1) from t1;


0 comments on commit 2e68c7e

Please sign in to comment.