Skip to content

Commit

Permalink
[Enhancement] Support insert timeout
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Sep 4, 2024
1 parent 37b6339 commit aa60ea4
Show file tree
Hide file tree
Showing 25 changed files with 222 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ private void execute(ExecPlan execPlan, ConnectContext context) throws Exception
fragments, scanNodes);

QeProcessorImpl.INSTANCE.registerQuery(queryId, coord);
int leftTimeSecond = context.getSessionVariable().getQueryTimeoutS();
int leftTimeSecond = context.getExecTimeout();
coord.setTimeoutSecond(leftTimeSecond);
coord.exec();

Expand Down
3 changes: 1 addition & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ public enum ErrorCode {
ERR_INVALID_VALUE(5018, new byte[] {'H', 'Y', '0', '0', '0'}, "Invalid %s: '%s'. Expected values should be %s"),
ERR_NO_ALTER_OPERATION(5023, new byte[] {'H', 'Y', '0', '0', '0'},
"No operation in alter statement"),
ERR_QUERY_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'},
"Query timeout. %s"),
ERR_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, "%s reached its timeout of %d seconds, %s"),
ERR_FAILED_WHEN_INSERT(5025, new byte[] {'H', 'Y', '0', '0', '0'}, "Failed when INSERT execute"),
ERR_UNSUPPORTED_TYPE_IN_CTAS(5026, new byte[] {'H', 'Y', '0', '0', '0'},
"Unsupported type '%s' in create table as select statement"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
DataCacheSelectMetrics metrics = null;
Coordinator coordinator = stmtExecutor.getCoordinator();
Preconditions.checkNotNull(coordinator, "Coordinator can't be null");
coordinator.join(connectContext.getSessionVariable().getQueryTimeoutS());
coordinator.join(stmtExecutor.getExecTimeout());
if (coordinator.isDone()) {
metrics = stmtExecutor.getCoordinator().getDataCacheSelectMetrics();
}
Expand Down
33 changes: 27 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.starrocks.sql.analyzer.Authorizer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.CleanTemporaryTableStmt;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.SetListItem;
import com.starrocks.sql.ast.SetStmt;
import com.starrocks.sql.ast.SetType;
Expand Down Expand Up @@ -911,6 +912,18 @@ public void kill(boolean killConnection, String cancelledMessage) {
}
}

public int getExecTimeout() {
return executor != null ? executor.getExecTimeout() : sessionVariable.getQueryTimeoutS();
}

private String getExecType() {
return executor != null ? executor.getExecType() : "Query";
}

private boolean isExecLoadType() {
return executor != null && executor.getParsedStmt() instanceof DmlStmt;
}

public void checkTimeout(long now) {
long startTimeMillis = getStartTime();
if (startTimeMillis <= 0) {
Expand All @@ -920,27 +933,35 @@ public void checkTimeout(long now) {
long delta = now - startTimeMillis;
boolean killFlag = false;
boolean killConnection = false;
String errMsg = "";
if (command == MysqlCommand.COM_SLEEP) {
if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
int waitTimeout = sessionVariable.getWaitTimeoutS();
if (delta > waitTimeout * 1000L) {
// Need kill this connection.
LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}",
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS());
getMysqlChannel().getRemoteHostPortString(), waitTimeout);

killFlag = true;
killConnection = true;

errMsg = String.format("Connection reached its wait timeout of %d seconds", waitTimeout);
}
} else {
long timeoutSecond = sessionVariable.getQueryTimeoutS();
long timeoutSecond = getExecTimeout();
if (delta > timeoutSecond * 1000L) {
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS());
LOG.warn("kill {} timeout, remote: {}, query timeout: {}", getExecType(),
getMysqlChannel().getRemoteHostPortString(), timeoutSecond);

// Only kill
killFlag = true;

String suggestedMsg = String.format("please increase the '%s' session variable",
isExecLoadType() ? "insert_timeout" : "query_timeout");
errMsg = ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeoutSecond, suggestedMsg);
}
}
if (killFlag) {
kill(killConnection, "query timeout");
kill(killConnection, errMsg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ public LeaderOpExecutor(Pair<String, Integer> ipAndPort, StatementBase parsedStm
this.originStmt = originStmt;
this.ctx = ctx;
if (status.isNeedToWaitJournalSync()) {
this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.waitTimeoutMs = ctx.getExecTimeout() * 1000;
} else {
this.waitTimeoutMs = 0;
}
// set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms
// set thriftTimeoutMs to exec timeout + thrift_rpc_timeout_ms
// so that we can return an execution timeout instead of a network timeout
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms;
this.thriftTimeoutMs = ctx.getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms;
if (this.thriftTimeoutMs < 0) {
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.thriftTimeoutMs = ctx.getExecTimeout() * 1000;
}
this.parsedStmt = parsedStmt;
}
Expand Down
21 changes: 11 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.qe;

import com.starrocks.common.ErrorCode;
import com.starrocks.common.Status;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.metric.MetricRepo;
Expand Down Expand Up @@ -62,19 +63,20 @@ public class ResultReceiver {
private volatile boolean isDone = false;
private volatile boolean isCancel = false;
private long packetIdx = 0;
private final long timeoutTs;
private final int timeoutMs;
private final long deadlineMs;
private final TNetworkAddress address;
private final PUniqueId finstId;
private final Long backendId;
private Thread currentThread;

public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) {
this.finstId = new PUniqueId();
this.finstId.hi = tid.hi;
this.finstId.lo = tid.lo;
this.backendId = backendId;
this.address = address;
this.timeoutTs = System.currentTimeMillis() + timeoutMs;
this.timeoutMs = timeoutMs;
this.deadlineMs = System.currentTimeMillis() + timeoutMs;
}

public RowBatch getNext(Status status) throws TException {
Expand All @@ -90,11 +92,11 @@ public RowBatch getNext(Status status) throws TException {
PFetchDataResult pResult = null;
while (pResult == null) {
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
if (currentTs >= deadlineMs) {
throw new TimeoutException("query timeout");
}
try {
pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
pResult = future.get(deadlineMs - currentTs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue to get result
LOG.info("future get interrupted Exception");
Expand Down Expand Up @@ -139,17 +141,16 @@ public RowBatch getNext(Status status) throws TException {
LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e);
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
status.setStatus(new Status(TStatusCode.TIMEOUT,
String.format("Query exceeded time limit of %d seconds",
ConnectContext.get().getSessionVariable().getQueryTimeoutS())));
status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg(
"Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable")));
} else {
status.setRpcStatus(e.getMessage());
SimpleScheduler.addToBlocklist(backendId);
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
status.setInternalErrorStatus(String.format("Query exceeded time limit of %d seconds",
ConnectContext.get().getSessionVariable().getQueryTimeoutS()));
status.setInternalErrorStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg(
"Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable"));
if (MetricRepo.hasInit) {
MetricRepo.COUNTER_QUERY_TIMEOUT.increase(1L);
}
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_SPILL = "enable_spill";
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk";
Expand Down Expand Up @@ -1124,6 +1125,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
private double insertMaxFilterRatio = 0;

@VariableMgr.VarAttr(name = INSERT_TIMEOUT)
private int insertTimeoutS = 14400;

@VariableMgr.VarAttr(name = ENABLE_SPILL)
private boolean enableSpill = false;

Expand Down Expand Up @@ -2723,6 +2727,14 @@ public void setInsertMaxFilterRatio(double insertMaxFilterRatio) {
this.insertMaxFilterRatio = insertMaxFilterRatio;
}

public int getInsertTimeoutS() {
return insertTimeoutS;
}

public void setInsertTimeoutS(int insertTimeoutS) {
this.insertTimeoutS = insertTimeoutS;
}

public boolean isEnableSpill() {
return enableSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void exec() throws UserException {
if (null == future) {
return;
}
PExecShortCircuitResult shortCircuitResult = future.get(
context.getSessionVariable().getQueryTimeoutS(), TimeUnit.SECONDS);
PExecShortCircuitResult shortCircuitResult = future.get(context.getExecTimeout(), TimeUnit.SECONDS);
watch.stop();
long t = watch.elapsed().toMillis();
MetricRepo.HISTO_SHORTCIRCUIT_RPC_LATENCY.update(t);
Expand Down
46 changes: 37 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,8 @@ private void handleAnalyzeStmt() throws IOException {
analyzeStatus.setStatus(StatsConstants.ScheduleStatus.PENDING);
GlobalStateMgr.getCurrentState().getAnalyzeMgr().replayAddAnalyzeStatus(analyzeStatus);

int timeout = context.getSessionVariable().getQueryTimeoutS();
int queryTimeout = context.getSessionVariable().getQueryTimeoutS();
int insertTimeout = context.getSessionVariable().getInsertTimeoutS();
try {
Future<?> future = GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeTaskThreadPool()
.submit(() -> executeAnalyze(analyzeStmt, analyzeStatus, db, table));
Expand All @@ -1303,6 +1304,7 @@ private void handleAnalyzeStmt() throws IOException {
// will print warning log if timeout, so we update timeout temporarily to avoid
// warning log
context.getSessionVariable().setQueryTimeoutS((int) Config.statistic_collect_query_timeout);
context.getSessionVariable().setInsertTimeoutS((int) Config.statistic_collect_query_timeout);
future.get();
}
} catch (RejectedExecutionException e) {
Expand All @@ -1314,7 +1316,8 @@ private void handleAnalyzeStmt() throws IOException {
analyzeStatus.setReason("The statistics tasks running failed");
GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus);
} finally {
context.getSessionVariable().setQueryTimeoutS(timeout);
context.getSessionVariable().setQueryTimeoutS(queryTimeout);
context.getSessionVariable().setInsertTimeoutS(insertTimeout);
}

ShowResultSet resultSet = analyzeStatus.toShowResult();
Expand Down Expand Up @@ -2167,7 +2170,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
estimateFileNum,
estimateScanFileSize,
type,
ConnectContext.get().getSessionVariable().getQueryTimeoutS(),
getExecTimeout(),
coord);
loadJob.setJobProperties(stmt.getProperties());
jobId = loadJob.getId();
Expand All @@ -2192,8 +2195,9 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
coord.setTopProfileSupplier(this::buildTopLevelProfile);
coord.setExecPlan(execPlan);

long jobDeadLineMs = System.currentTimeMillis() + context.getSessionVariable().getQueryTimeoutS() * 1000;
coord.join(context.getSessionVariable().getQueryTimeoutS());
int timeout = getExecTimeout();
long jobDeadLineMs = System.currentTimeMillis() + timeout * 1000;
coord.join(timeout);
if (!coord.isDone()) {
/*
* In this case, There are two factors that lead query cancelled:
Expand All @@ -2214,16 +2218,16 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
coord.cancel(ErrorCode.ERR_QUERY_EXCEPTION.formatErrorMsg());
ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_EXCEPTION);
} else {
coord.cancel(ErrorCode.ERR_QUERY_TIMEOUT.formatErrorMsg());
coord.cancel(ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeout, ""));
if (coord.isThriftServerHighLoad()) {
ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_TIMEOUT,
ErrorReport.reportDdlException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout,
"Please check the thrift-server-pool metrics, " +
"if the pool size reaches thrift_server_max_worker_threads(default is 4096), " +
"you can set the config to a higher value in fe.conf, " +
"or set parallel_fragment_exec_instance_num to a lower value in session variable");
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_TIMEOUT,
"Increase the query_timeout session variable and retry");
ErrorReport.reportDdlException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout,
"please increase the 'insert_timeout' session variable");
}
}
}
Expand Down Expand Up @@ -2689,4 +2693,28 @@ public void addFinishedQueryDetail() {
QueryDetailQueue.addQueryDetail(queryDetail);
}

public int getExecTimeout() {
if (parsedStmt instanceof DmlStmt) {
int timeout = ((DmlStmt) parsedStmt).getTimeout();
return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS();
}
if (parsedStmt instanceof CreateTableAsSelectStmt) {
InsertStmt insertStmt = ((CreateTableAsSelectStmt) parsedStmt).getInsertStmt();
int timeout = insertStmt.getTimeout();
return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS();
}
return context.getSessionVariable().getQueryTimeoutS();
}

public String getExecType() {
if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) {
return "Insert";
} else if (parsedStmt instanceof UpdateStmt) {
return "Update";
} else if (parsedStmt instanceof DeleteStmt) {
return "Delete";
} else {
return "Query";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static JobSpec fromQuerySpec(ConnectContext context,
TQueryType queryType) {
TQueryOptions queryOptions = context.getSessionVariable().toThrift();
queryOptions.setQuery_type(queryType);
queryOptions.setQuery_timeout(context.getExecTimeout());

TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTimeInstant(),
context.getSessionVariable().getTimeZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) {

// change `query_timeout` to 1 hour by default for better user experience.
if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) {
mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT);
mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_QUERY_TIMEOUT);
}

// set insert_max_filter_ratio by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ void buildPhysicalTopology() throws Exception {
// NOTE use a fake transaction id, the real one would be generated when epoch started
long fakeTransactionId = 1;
long dbId = getView().getDbId();
long timeout = context.getSessionVariable().getQueryTimeoutS();
long timeout = context.getExecTimeout();
dataSink.init(context.getExecutionId(), fakeTransactionId, dbId, timeout);
dataSink.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,7 @@ public Future<TStatus> refreshOtherFesTable(TNetworkAddress thriftAddress, Table
if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable() == null) {
timeout = Config.thrift_rpc_timeout_ms * 10;
} else {
timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms;
timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms;
}

FutureTask<TStatus> task = new FutureTask<TStatus>(() -> {
Expand Down
3 changes: 1 addition & 2 deletions fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1127,8 +1127,7 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
setFrontendConfig(stmt.getConfig().getMap());

List<Frontend> allFrontends = getFrontends(null);
int timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000
+ Config.thrift_rpc_timeout_ms;
int timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms;
StringBuilder errMsg = new StringBuilder();
for (Frontend fe : allFrontends) {
if (fe.getHost().equals(getSelfNode().first)) {
Expand Down
Loading

0 comments on commit aa60ea4

Please sign in to comment.