diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java b/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java index 4fb68f1ccfb0fc..a676890bdc7c9e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java @@ -630,7 +630,7 @@ private void execute(ExecPlan execPlan, ConnectContext context) throws Exception fragments, scanNodes); QeProcessorImpl.INSTANCE.registerQuery(queryId, coord); - int leftTimeSecond = context.getSessionVariable().getQueryTimeoutS(); + int leftTimeSecond = context.getExecTimeout(); coord.setTimeoutSecond(leftTimeSecond); coord.exec(); diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java index a9cfcb2857c842..22f2a79bd6aced 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java @@ -144,8 +144,7 @@ public enum ErrorCode { ERR_INVALID_VALUE(5018, new byte[] {'H', 'Y', '0', '0', '0'}, "Invalid %s: '%s'. Expected values should be %s"), ERR_NO_ALTER_OPERATION(5023, new byte[] {'H', 'Y', '0', '0', '0'}, "No operation in alter statement"), - ERR_QUERY_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, - "Query timeout. %s"), + ERR_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, "%s reached its timeout of %d seconds, %s"), ERR_FAILED_WHEN_INSERT(5025, new byte[] {'H', 'Y', '0', '0', '0'}, "Failed when INSERT execute"), ERR_UNSUPPORTED_TYPE_IN_CTAS(5026, new byte[] {'H', 'Y', '0', '0', '0'}, "Unsupported type '%s' in create table as select statement"), diff --git a/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java b/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java index 0fe2ec9f7c8c25..9c882ccf0265f9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java @@ -76,7 +76,7 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem DataCacheSelectMetrics metrics = null; Coordinator coordinator = stmtExecutor.getCoordinator(); Preconditions.checkNotNull(coordinator, "Coordinator can't be null"); - coordinator.join(connectContext.getSessionVariable().getQueryTimeoutS()); + coordinator.join(stmtExecutor.getExecTimeout()); if (coordinator.isDone()) { metrics = stmtExecutor.getCoordinator().getDataCacheSelectMetrics(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 9d9a223ad1f398..b4073b214c5fa4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -66,6 +66,7 @@ import com.starrocks.sql.analyzer.Authorizer; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.CleanTemporaryTableStmt; +import com.starrocks.sql.ast.DmlStmt; import com.starrocks.sql.ast.SetListItem; import com.starrocks.sql.ast.SetStmt; import com.starrocks.sql.ast.SetType; @@ -911,6 +912,18 @@ public void kill(boolean killConnection, String cancelledMessage) { } } + public int getExecTimeout() { + return executor != null ? executor.getExecTimeout() : sessionVariable.getQueryTimeoutS(); + } + + private String getExecType() { + return executor != null ? executor.getExecType() : "Query"; + } + + private boolean isExecLoadType() { + return executor != null && executor.getParsedStmt() instanceof DmlStmt; + } + public void checkTimeout(long now) { long startTimeMillis = getStartTime(); if (startTimeMillis <= 0) { @@ -920,27 +933,35 @@ public void checkTimeout(long now) { long delta = now - startTimeMillis; boolean killFlag = false; boolean killConnection = false; + String errMsg = ""; if (command == MysqlCommand.COM_SLEEP) { - if (delta > sessionVariable.getWaitTimeoutS() * 1000L) { + int waitTimeout = sessionVariable.getWaitTimeoutS(); + if (delta > waitTimeout * 1000L) { // Need kill this connection. LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); + getMysqlChannel().getRemoteHostPortString(), waitTimeout); killFlag = true; killConnection = true; + + errMsg = String.format("Connection reached its wait timeout of %d seconds", waitTimeout); } } else { - long timeoutSecond = sessionVariable.getQueryTimeoutS(); + long timeoutSecond = getExecTimeout(); if (delta > timeoutSecond * 1000L) { - LOG.warn("kill query timeout, remote: {}, query timeout: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS()); + LOG.warn("kill {} timeout, remote: {}, query timeout: {}", getExecType(), + getMysqlChannel().getRemoteHostPortString(), timeoutSecond); // Only kill killFlag = true; + + String suggestedMsg = String.format("please increase the '%s' session variable", + isExecLoadType() ? "insert_timeout" : "query_timeout"); + errMsg = ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeoutSecond, suggestedMsg); } } if (killFlag) { - kill(killConnection, "query timeout"); + kill(killConnection, errMsg); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java index e9fbfc4636da00..3605464fa9d1ba 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java @@ -97,15 +97,15 @@ public LeaderOpExecutor(Pair ipAndPort, StatementBase parsedStm this.originStmt = originStmt; this.ctx = ctx; if (status.isNeedToWaitJournalSync()) { - this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + this.waitTimeoutMs = ctx.getExecTimeout() * 1000; } else { this.waitTimeoutMs = 0; } - // set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms + // set thriftTimeoutMs to exec timeout + thrift_rpc_timeout_ms // so that we can return an execution timeout instead of a network timeout - this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms; + this.thriftTimeoutMs = ctx.getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; if (this.thriftTimeoutMs < 0) { - this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + this.thriftTimeoutMs = ctx.getExecTimeout() * 1000; } this.parsedStmt = parsedStmt; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java index 98ddea619bb94f..fc71057298ac01 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java @@ -34,6 +34,7 @@ package com.starrocks.qe; +import com.starrocks.common.ErrorCode; import com.starrocks.common.Status; import com.starrocks.common.util.DebugUtil; import com.starrocks.metric.MetricRepo; @@ -62,11 +63,11 @@ public class ResultReceiver { private volatile boolean isDone = false; private volatile boolean isCancel = false; private long packetIdx = 0; - private final long timeoutTs; + private final int timeoutMs; + private final long deadlineMs; private final TNetworkAddress address; private final PUniqueId finstId; private final Long backendId; - private Thread currentThread; public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) { this.finstId = new PUniqueId(); @@ -74,7 +75,8 @@ public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, in this.finstId.lo = tid.lo; this.backendId = backendId; this.address = address; - this.timeoutTs = System.currentTimeMillis() + timeoutMs; + this.timeoutMs = timeoutMs; + this.deadlineMs = System.currentTimeMillis() + timeoutMs; } public RowBatch getNext(Status status) throws TException { @@ -90,11 +92,11 @@ public RowBatch getNext(Status status) throws TException { PFetchDataResult pResult = null; while (pResult == null) { long currentTs = System.currentTimeMillis(); - if (currentTs >= timeoutTs) { + if (currentTs >= deadlineMs) { throw new TimeoutException("query timeout"); } try { - pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + pResult = future.get(deadlineMs - currentTs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // continue to get result LOG.info("future get interrupted Exception"); @@ -139,17 +141,16 @@ public RowBatch getNext(Status status) throws TException { LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.setStatus(new Status(TStatusCode.TIMEOUT, - String.format("Query exceeded time limit of %d seconds", - ConnectContext.get().getSessionVariable().getQueryTimeoutS()))); + status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg( + "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable"))); } else { status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlocklist(backendId); } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.setInternalErrorStatus(String.format("Query exceeded time limit of %d seconds", - ConnectContext.get().getSessionVariable().getQueryTimeoutS())); + status.setInternalErrorStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg( + "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable")); if (MetricRepo.hasInit) { MetricRepo.COUNTER_QUERY_TIMEOUT.increase(1L); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 684aec3fa23f91..dd89c148811e85 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -189,6 +189,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio"; + public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_SPILL = "enable_spill"; public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage"; public static final String DISABLE_SPILL_TO_LOCAL_DISK = "disable_spill_to_local_disk"; @@ -1124,6 +1125,9 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO) private double insertMaxFilterRatio = 0; + @VariableMgr.VarAttr(name = INSERT_TIMEOUT) + private int insertTimeoutS = 14400; + @VariableMgr.VarAttr(name = ENABLE_SPILL) private boolean enableSpill = false; @@ -2723,6 +2727,14 @@ public void setInsertMaxFilterRatio(double insertMaxFilterRatio) { this.insertMaxFilterRatio = insertMaxFilterRatio; } + public int getInsertTimeoutS() { + return insertTimeoutS; + } + + public void setInsertTimeoutS(int insertTimeoutS) { + this.insertTimeoutS = insertTimeoutS; + } + public boolean isEnableSpill() { return enableSpill; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java index 298429a4055b5a..2c1597fd3f21a4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java @@ -113,8 +113,7 @@ public void exec() throws UserException { if (null == future) { return; } - PExecShortCircuitResult shortCircuitResult = future.get( - context.getSessionVariable().getQueryTimeoutS(), TimeUnit.SECONDS); + PExecShortCircuitResult shortCircuitResult = future.get(context.getExecTimeout(), TimeUnit.SECONDS); watch.stop(); long t = watch.elapsed().toMillis(); MetricRepo.HISTO_SHORTCIRCUIT_RPC_LATENCY.update(t); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index b2b5834a3b71a9..d5cbc0e58cdc3b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -1293,7 +1293,8 @@ private void handleAnalyzeStmt() throws IOException { analyzeStatus.setStatus(StatsConstants.ScheduleStatus.PENDING); GlobalStateMgr.getCurrentState().getAnalyzeMgr().replayAddAnalyzeStatus(analyzeStatus); - int timeout = context.getSessionVariable().getQueryTimeoutS(); + int queryTimeout = context.getSessionVariable().getQueryTimeoutS(); + int insertTimeout = context.getSessionVariable().getInsertTimeoutS(); try { Future future = GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeTaskThreadPool() .submit(() -> executeAnalyze(analyzeStmt, analyzeStatus, db, table)); @@ -1303,6 +1304,7 @@ private void handleAnalyzeStmt() throws IOException { // will print warning log if timeout, so we update timeout temporarily to avoid // warning log context.getSessionVariable().setQueryTimeoutS((int) Config.statistic_collect_query_timeout); + context.getSessionVariable().setInsertTimeoutS((int) Config.statistic_collect_query_timeout); future.get(); } } catch (RejectedExecutionException e) { @@ -1314,7 +1316,8 @@ private void handleAnalyzeStmt() throws IOException { analyzeStatus.setReason("The statistics tasks running failed"); GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus); } finally { - context.getSessionVariable().setQueryTimeoutS(timeout); + context.getSessionVariable().setQueryTimeoutS(queryTimeout); + context.getSessionVariable().setInsertTimeoutS(insertTimeout); } ShowResultSet resultSet = analyzeStatus.toShowResult(); @@ -2167,7 +2170,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { estimateFileNum, estimateScanFileSize, type, - ConnectContext.get().getSessionVariable().getQueryTimeoutS(), + getExecTimeout(), coord); loadJob.setJobProperties(stmt.getProperties()); jobId = loadJob.getId(); @@ -2192,8 +2195,9 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { coord.setTopProfileSupplier(this::buildTopLevelProfile); coord.setExecPlan(execPlan); - long jobDeadLineMs = System.currentTimeMillis() + context.getSessionVariable().getQueryTimeoutS() * 1000; - coord.join(context.getSessionVariable().getQueryTimeoutS()); + int timeout = getExecTimeout(); + long jobDeadLineMs = System.currentTimeMillis() + timeout * 1000; + coord.join(timeout); if (!coord.isDone()) { /* * In this case, There are two factors that lead query cancelled: @@ -2214,16 +2218,16 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { coord.cancel(ErrorCode.ERR_QUERY_EXCEPTION.formatErrorMsg()); ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_EXCEPTION); } else { - coord.cancel(ErrorCode.ERR_QUERY_TIMEOUT.formatErrorMsg()); + coord.cancel(ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeout, "")); if (coord.isThriftServerHighLoad()) { - ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_TIMEOUT, + ErrorReport.reportDdlException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, "Please check the thrift-server-pool metrics, " + "if the pool size reaches thrift_server_max_worker_threads(default is 4096), " + "you can set the config to a higher value in fe.conf, " + "or set parallel_fragment_exec_instance_num to a lower value in session variable"); } else { - ErrorReport.reportDdlException(ErrorCode.ERR_QUERY_TIMEOUT, - "Increase the query_timeout session variable and retry"); + ErrorReport.reportDdlException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, + "please increase the 'insert_timeout' session variable"); } } } @@ -2689,4 +2693,28 @@ public void addFinishedQueryDetail() { QueryDetailQueue.addQueryDetail(queryDetail); } + public int getExecTimeout() { + if (parsedStmt instanceof DmlStmt) { + int timeout = ((DmlStmt) parsedStmt).getTimeout(); + return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); + } + if (parsedStmt instanceof CreateTableAsSelectStmt) { + InsertStmt insertStmt = ((CreateTableAsSelectStmt) parsedStmt).getInsertStmt(); + int timeout = insertStmt.getTimeout(); + return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); + } + return context.getSessionVariable().getQueryTimeoutS(); + } + + public String getExecType() { + if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) { + return "Insert"; + } else if (parsedStmt instanceof UpdateStmt) { + return "Update"; + } else if (parsedStmt instanceof DeleteStmt) { + return "Delete"; + } else { + return "Query"; + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 43cdd65b64c227..00135f133ad7b5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -104,6 +104,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, TQueryType queryType) { TQueryOptions queryOptions = context.getSessionVariable().toThrift(); queryOptions.setQuery_type(queryType); + queryOptions.setQuery_timeout(context.getExecTimeout()); TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTimeInstant(), context.getSessionVariable().getTimeZone()); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index bdb6f46550b978..765b419df311f2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -577,7 +577,7 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { // change `query_timeout` to 1 hour by default for better user experience. if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) { - mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); + mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); } // set insert_max_filter_ratio by default diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java index edf32f3da781c4..219f28505a0aaa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java @@ -267,7 +267,7 @@ void buildPhysicalTopology() throws Exception { // NOTE use a fake transaction id, the real one would be generated when epoch started long fakeTransactionId = 1; long dbId = getView().getDbId(); - long timeout = context.getSessionVariable().getQueryTimeoutS(); + long timeout = context.getExecTimeout(); dataSink.init(context.getExecutionId(), fakeTransactionId, dbId, timeout); dataSink.complete(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 4ee05085788fc2..bebb6b74445bca 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -2350,7 +2350,7 @@ public Future refreshOtherFesTable(TNetworkAddress thriftAddress, Table if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable() == null) { timeout = Config.thrift_rpc_timeout_ms * 10; } else { - timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms; + timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; } FutureTask task = new FutureTask(() -> { diff --git a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java index 7d83b1bc762d6b..c0c054dd0143c1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java @@ -1127,8 +1127,7 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException { setFrontendConfig(stmt.getConfig().getMap()); List allFrontends = getFrontends(null); - int timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000 - + Config.thrift_rpc_timeout_ms; + int timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; StringBuilder errMsg = new StringBuilder(); for (Frontend fe : allFrontends) { if (fe.getHost().equals(getSelfNode().first)) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java index 953881c7f56507..a19d07113e477e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java @@ -120,8 +120,7 @@ public ExecPlan plan(DeleteStmt deleteStatement, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), deleteStatement.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), deleteStatement.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java index 226749f433664b..5c8461cdf611a1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java @@ -393,8 +393,7 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), insertStmt.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), insertStmt.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java index 2f35607191b033..3eb64d3294530f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java @@ -490,7 +490,7 @@ private static void beginTransaction(DmlStmt stmt, ConnectContext session) Lists.newArrayList(tbl.getSourceTableId()), label, sourceType, - session.getSessionVariable().getQueryTimeoutS(), + session.getExecTimeout(), tbl.getSourceTableHost(), tbl.getSourceTablePort(), authenticateParams); @@ -507,7 +507,7 @@ private static void beginTransaction(DmlStmt stmt, ConnectContext session) new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, - session.getSessionVariable().getQueryTimeoutS(), + session.getExecTimeout(), warehouseId); // add table indexes to transaction state diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java index 72b0e86156b992..5cbe46a050a343 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java @@ -155,8 +155,7 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), updateStmt.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), updateStmt.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java index 27b3913f287168..983b2f0d6d52bd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java @@ -190,6 +190,7 @@ private static void analyzeProperties(DeleteStmt deleteStmt, ConnectContext sess properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); } public static void analyze(DeleteStmt deleteStatement, ConnectContext session) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java index d8269884cefea3..e228cfca09670c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java @@ -290,15 +290,16 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext private static void analyzeProperties(InsertStmt insertStmt, ConnectContext session) { Map properties = insertStmt.getProperties(); - // use session variable if not set max_filter_ratio property + // use session variable if not set max_filter_ratio, strict_mode, timeout property if (!properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) { properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); } - // use session variable if not set strict_mode property - if (!properties.containsKey(LoadStmt.STRICT_MODE) && - session.getSessionVariable().getEnableInsertStrict()) { - properties.put(LoadStmt.STRICT_MODE, "true"); + if (!properties.containsKey(LoadStmt.STRICT_MODE)) { + properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); + } + if (!properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); } try { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java index 5f24949478ccaa..3254ecf2b20729 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java @@ -65,6 +65,7 @@ private static void analyzeProperties(UpdateStmt updateStmt, ConnectContext sess properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); } public static void analyze(UpdateStmt updateStmt, ConnectContext session) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java index 473d01c2863edb..1bf5f2bc68e249 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java @@ -51,4 +51,16 @@ public void setTxnId(long txnId) { public Map getProperties() { return properties; } + + public int getTimeout() { + int timeout = -1; + if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + try { + timeout = Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + } catch (NumberFormatException e) { + // ignore + } + } + return timeout; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index 24a32ea4b67cff..9e03c7d495f3c2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -105,6 +105,7 @@ public static ConnectContext buildConnectContext() { context.getSessionVariable().setEnableLoadProfile(false); context.getSessionVariable().setParallelExecInstanceNum(1); context.getSessionVariable().setQueryTimeoutS((int) Config.statistic_collect_query_timeout); + context.getSessionVariable().setInsertTimeoutS((int) Config.statistic_collect_query_timeout); context.getSessionVariable().setEnablePipelineEngine(true); context.getSessionVariable().setCboCteReuse(true); context.getSessionVariable().setCboCTERuseRatio(0); diff --git a/test/sql/test_insert_empty/R/test_insert_timeout b/test/sql/test_insert_empty/R/test_insert_timeout new file mode 100644 index 00000000000000..810d83db547ec4 --- /dev/null +++ b/test/sql/test_insert_empty/R/test_insert_timeout @@ -0,0 +1,63 @@ +-- name: test_insert_timeout + +create database db_${uuid0}; +use db_${uuid0}; + +set query_timeout = 2; +select sleep(4); +-- result: +[REGEX].*Query reached its timeout of 2 seconds, please increase the 'query_timeout' session variable.* +-- !result +set query_timeout = 300; + +create table t1(k1 int); + +set insert_timeout = 2; +insert into t1 select sleep(4); +-- result: +[REGEX].*Insert reached its timeout of 2 seconds, please increase the 'insert_timeout' session variable.* +-- !result + +set insert_timeout = 10; +insert into t1 select sleep(4); +-- result: +-- !result +select * from t1; +-- result: +1 +-- !result +truncate table t1; +-- result: +-- !result + +set insert_timeout = 2; +insert into t1 properties("timeout" = "10") select sleep(4); +-- result: +-- !result +select * from t1; +-- result: +1 +-- !result +truncate table t1; +-- result: +-- !result + +set insert_timeout = 2; +create table t2 as select sleep(4) as k1; +-- result: +[REGEX].*Insert reached its timeout of 2 seconds, please increase the 'insert_timeout' session variable.* +-- !result + +set insert_timeout = 10; +create table t2 as select sleep(4) as k1; +-- result: +-- !result +select * from t2; +-- result: +1 +-- !result +truncate table t2; +-- result: +-- !result + +set insert_timeout = 14400; diff --git a/test/sql/test_insert_empty/T/test_insert_timeout b/test/sql/test_insert_empty/T/test_insert_timeout new file mode 100644 index 00000000000000..5f933a354db592 --- /dev/null +++ b/test/sql/test_insert_empty/T/test_insert_timeout @@ -0,0 +1,33 @@ +-- name: test_insert_timeout + +create database db_${uuid0}; +use db_${uuid0}; + +set query_timeout = 2; +select sleep(4); +set query_timeout = 300; + +create table t1(k1 int); + +set insert_timeout = 2; +insert into t1 select sleep(4); + +set insert_timeout = 10; +insert into t1 select sleep(4); +select * from t1; +truncate table t1; + +set insert_timeout = 2; +insert into t1 properties("timeout" = "10") select sleep(4); +select * from t1; +truncate table t1; + +set insert_timeout = 2; +create table t2 as select sleep(4) as k1; + +set insert_timeout = 10; +create table t2 as select sleep(4) as k1; +select * from t2; +truncate table t2; + +set insert_timeout = 14400;