Skip to content

Commit

Permalink
[Enhancement] (nereids)implement KILL COMMAND in nereids
Browse files Browse the repository at this point in the history
  • Loading branch information
Yao-MR committed Dec 18, 2024
1 parent fd2e58b commit 5c19301
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ statementBase
| supportedSetStatement #supportedSetStatementAlias
| supportedUnsetStatement #supportedUnsetStatementAlias
| supportedShowStatement #supportedShowStatementAlias
| supportedKillStatement #supportedKillStatementAlias
| unsupportedStatement #unsupported
;

unsupportedStatement
: unsupportedUseStatement
| unsupportedDmlStatement
| unsupportedKillStatement
| unsupportedDescribeStatement
| unsupportedCreateStatement
| unsupportedDropStatement
Expand Down Expand Up @@ -868,7 +868,7 @@ stageAndPattern
(LEFT_PAREN pattern=STRING_LITERAL RIGHT_PAREN)?
;

unsupportedKillStatement
supportedKillStatement
: KILL (CONNECTION)? INTEGER_VALUE #killConnection
| KILL QUERY (INTEGER_VALUE | STRING_LITERAL) #killQuery
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.KillCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
Expand Down Expand Up @@ -4054,6 +4055,21 @@ public LogicalPlan visitShowVariables(ShowVariablesContext ctx) {
}
}

@Override
public KillCommand visitKillConnection(DorisParser.KillConnectionContext ctx) {
int connectionId = Integer.parseInt(ctx.INTEGER_VALUE().getText());
return new KillCommand(true, connectionId);
}

@Override
public KillCommand visitKillQuery(DorisParser.KillQueryContext ctx) {
if (ctx.INTEGER_VALUE() != null) {
return new KillCommand(false, Integer.parseInt(ctx.INTEGER_VALUE().getText()));
} else {
return new KillCommand(stripQuotes(ctx.STRING_LITERAL().getText()));
}
}

@Override
public ShowViewCommand visitShowView(ShowViewContext ctx) {
List<String> tableNameParts = visitMultipartIdentifier(ctx.tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public enum PlanType {

PREPARED_COMMAND,
EXECUTE_COMMAND,
KILL_COMMAND,
SHOW_CONFIG_COMMAND,
SHOW_PROC_COMMAND,
SHOW_ROLE_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;

/**
* Representation of a Kill command.
* Acceptable syntax:
* KILL (CONNECTION)? INTEGER_VALUE #killConnection
* KILL QUERY (INTEGER_VALUE | STRING_LITERAL) #killQuery
*/
public class KillCommand extends Command implements NoForward {
private static final Logger LOG = LogManager.getLogger(KillCommand.class);

private final boolean isConnectionKill;
private final int connectionId;
private final String queryId;

public KillCommand(boolean isConnectionKill, int connectionId) {
super(PlanType.KILL_COMMAND);
this.isConnectionKill = isConnectionKill;
this.connectionId = connectionId;
this.queryId = "";
}

public KillCommand(String queryId) {
super(PlanType.KILL_COMMAND);
this.isConnectionKill = false;
this.connectionId = -1;
this.queryId = queryId;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
ConnectContext killCtx = null;
if (connectionId == -1) {
// when killCtx == null, this means the query not in FE,
// then we just send kill signal to BE
killCtx = ctx.getConnectScheduler().getContextWithQueryId(queryId);
} else {
killCtx = ctx.getConnectScheduler().getContext(connectionId);
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, connectionId);
}
}

if (killCtx == null) {
TUniqueId tQueryId = null;
try {
tQueryId = DebugUtil.parseTUniqueIdFromString(queryId);
} catch (NumberFormatException e) {
throw new UserException(e.getMessage());
}
LOG.info("kill query {}", queryId);
Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values();
for (Backend be : nodesToPublish) {
if (be.isAlive()) {
try {
Status cancelReason = new Status(TStatusCode.CANCELLED, "user kill query");
BackendServiceProxy.getInstance()
.cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId, cancelReason);
} catch (Throwable t) {
LOG.info("send kill query {} rpc to be {} failed", queryId, be);
}
}
}
} else if (ctx == killCtx) {
// Suicide
ctx.setKilled();
} else {
// Check auth
// Only user itself and user with admin priv can kill connection
if (!killCtx.getQualifiedUser().equals(ConnectContext.get().getQualifiedUser())
&& !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportDdlException(ErrorCode.ERR_KILL_DENIED_ERROR, connectionId);
}

killCtx.kill(isConnectionKill);
}
ctx.getState().setOk();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("KILL ");
if (!isConnectionKill) {
sb.append("QUERY ");
}
sb.append(connectionId);
return sb.toString();
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitKillCommand(this, context);
}

@Override
public StmtType stmtType() {
return StmtType.KILL;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.KillCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
Expand Down Expand Up @@ -209,6 +210,10 @@ default R visitUnsetVariableCommand(UnsetVariableCommand unsetVariableCommand, C
return visitCommand(unsetVariableCommand, context);
}

default R visitKillCommand(KillCommand killCommand, C context) {
return visitCommand(killCommand, context);
}

default R visitUnsetDefaultStorageVaultCommand(UnsetDefaultStorageVaultCommand unsetDefaultStorageVaultCommand,
C context) {
return visitCommand(unsetDefaultStorageVaultCommand, context);
Expand Down
15 changes: 15 additions & 0 deletions regression-test/data/nereids_p0/kill/baseall.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
0,1,1989,1001,11011902,123.123,true,1989-03-21,1989-03-21 13:00:00,wangjuoo4,0.1,6.333,string12345,170141183460469231731687303715884105727
0,2,1986,1001,11011903,1243.5,false,1901-12-31,1989-03-21 13:00:00,wangynnsf,20.268,789.25,string12345,-170141183460469231731687303715884105727
0,3,1989,1002,11011905,24453.325,false,2012-03-14,2000-01-01 00:00:00,yunlj8@nk,78945,3654.0,string12345,0
0,4,1991,3021,-11011907,243243.325,false,3124-10-10,2015-03-13 10:30:00,yanvjldjlll,2.06,-0.001,string12345,20220101
0,5,1985,5014,-11011903,243.325,true,2015-01-01,2015-03-13 12:36:38,du3lnvl,-0.000,-365,string12345,20220102
0,6,32767,3021,123456,604587.000,true,2014-11-11,2015-03-13 12:36:38,yanavnd,0.1,80699,string12345,20220104
0,7,-32767,1002,7210457,3.141,false,1988-03-21,1901-01-01 00:00:00,jiw3n4,0.0,6058,string12345,-20220101
1,8,255,2147483647,11011920,-0.123,true,1989-03-21,9999-11-11 12:12:00,wangjuoo5,987456.123,12.14,string12345,-2022
1,9,1991,-2147483647,11011902,-654.654,true,1991-08-11,1989-03-21 13:11:00,wangjuoo4,0.000,69.123,string12345,11011903
1,10,1991,5014,9223372036854775807,-258.369,false,2015-04-02,2013-04-02 15:16:52,wangynnsf,-123456.54,0.235,string12345,-11011903
1,11,1989,25699,-9223372036854775807,0.666,true,2015-04-02,1989-03-21 13:11:00,yunlj8@nk,-987.001,4.336,string12345,1701411834604692317316873037158
1,12,32767,-2147483647,9223372036854775807,243.325,false,1991-08-11,2013-04-02 15:16:52,lifsno,-564.898,3.141592654,string12345,1701604692317316873037158
1,13,-32767,2147483647,-9223372036854775807,100.001,false,2015-04-02,2015-04-02 00:00:00,wenlsfnl,123.456,3.141592653,string12345,701411834604692317316873037158
1,14,255,103,11011902,-0.000,false,2015-04-02,2015-04-02 00:00:00, ,3.141592654,2.036,string12345,701411834604692317316873
1,15,1992,3021,11011920,0.00,true,9999-12-12,2015-04-02 00:00:00,,3.141592653,20.456,string12345,701411834604692317
144 changes: 144 additions & 0 deletions regression-test/suites/nereids_p0/kill/test_kill_command.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods
import org.apache.doris.regression.util.Http

suite("kill_command") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"

sql """ DROP TABLE IF EXISTS `test` """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """
CREATE TABLE IF NOT EXISTS `test` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace_if_not_null null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""

GetDebugPoint().clearDebugPointsForAllBEs()
streamLoad {
table "baseall"
set 'column_separator', ','
file "baseall.txt"
}

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[10].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql "kill ${item[1]}"
logger.info(res.toString())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait")
}

streamLoad {
table "baseall"
set 'column_separator', ','
file "baseall.txt"
}

try {
GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait")
def thread1 = new Thread({
try {
def res = sql "insert into test select * from baseall where k1 <= 3"
logger.info(res.toString())
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains("Communications link failure"))
}
})
thread1.start()

sleep(1000)

def processList = sql "show processlist"
logger.info(processList.toString())
processList.each { item ->
logger.info(item[1].toString())
logger.info(item[10].toString())
logger.info(item[11].toString())
if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){
def res = sql """ kill query "${item[10]}" """
logger.info(res.toString())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait")
}

sql """ DROP TABLE IF EXISTS `baseall` """
sql """ DROP TABLE IF EXISTS `test` """
}

0 comments on commit 5c19301

Please sign in to comment.