From a36b03e8b2d67415e20e9e000d69e5b9a0088eae Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 30 Aug 2024 11:12:49 +0800 Subject: [PATCH 1/8] [Enhancement] Support insert timeout Signed-off-by: wyb --- .../com/starrocks/catalog/DictionaryMgr.java | 2 +- .../java/com/starrocks/common/ErrorCode.java | 3 +- .../datacache/DataCacheSelectExecutor.java | 2 +- .../java/com/starrocks/qe/ConnectContext.java | 33 ++++++++-- .../com/starrocks/qe/LeaderOpExecutor.java | 8 +-- .../java/com/starrocks/qe/ResultReceiver.java | 21 ++++--- .../com/starrocks/qe/SessionVariable.java | 12 ++++ .../qe/ShortCircuitHybridExecutor.java | 3 +- .../java/com/starrocks/qe/StmtExecutor.java | 46 +++++++++++--- .../starrocks/qe/scheduler/dag/JobSpec.java | 1 + .../PartitionBasedMvRefreshProcessor.java | 2 +- .../scheduler/mv/MVMaintenanceJob.java | 2 +- .../com/starrocks/server/GlobalStateMgr.java | 2 +- .../java/com/starrocks/server/NodeMgr.java | 3 +- .../java/com/starrocks/sql/DeletePlanner.java | 3 +- .../java/com/starrocks/sql/InsertPlanner.java | 3 +- .../com/starrocks/sql/StatementPlanner.java | 4 +- .../java/com/starrocks/sql/UpdatePlanner.java | 3 +- .../sql/analyzer/DeleteAnalyzer.java | 1 + .../sql/analyzer/InsertAnalyzer.java | 6 +- .../sql/analyzer/UpdateAnalyzer.java | 1 + .../java/com/starrocks/sql/ast/DmlStmt.java | 12 +++- .../starrocks/statistic/StatisticUtils.java | 1 + .../test_insert_empty/R/test_insert_timeout | 63 +++++++++++++++++++ .../test_insert_empty/T/test_insert_timeout | 33 ++++++++++ 25 files changed, 219 insertions(+), 51 deletions(-) create mode 100644 test/sql/test_insert_empty/R/test_insert_timeout create mode 100644 test/sql/test_insert_empty/T/test_insert_timeout diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java b/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java index 4fb68f1ccfb0f..a676890bdc7c9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/DictionaryMgr.java @@ -630,7 +630,7 @@ private void execute(ExecPlan execPlan, ConnectContext context) throws Exception fragments, scanNodes); QeProcessorImpl.INSTANCE.registerQuery(queryId, coord); - int leftTimeSecond = context.getSessionVariable().getQueryTimeoutS(); + int leftTimeSecond = context.getExecTimeout(); coord.setTimeoutSecond(leftTimeSecond); coord.exec(); diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java index e9d7589180ce4..0f769cf1c276e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java @@ -144,8 +144,7 @@ public enum ErrorCode { ERR_INVALID_VALUE(5018, new byte[] {'H', 'Y', '0', '0', '0'}, "Invalid %s: '%s'. Expected values should be %s"), ERR_NO_ALTER_OPERATION(5023, new byte[] {'H', 'Y', '0', '0', '0'}, "No operation in alter statement"), - ERR_QUERY_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, - "Query timeout. %s"), + ERR_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, "%s reached its timeout of %d seconds, %s"), ERR_FAILED_WHEN_INSERT(5025, new byte[] {'H', 'Y', '0', '0', '0'}, "Failed when INSERT execute"), ERR_UNSUPPORTED_TYPE_IN_CTAS(5026, new byte[] {'H', 'Y', '0', '0', '0'}, "Unsupported type '%s' in create table as select statement"), diff --git a/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java b/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java index c2780a47a9d3c..ae2b318cf9c1e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/datacache/DataCacheSelectExecutor.java @@ -76,7 +76,7 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem DataCacheSelectMetrics metrics = null; Coordinator coordinator = stmtExecutor.getCoordinator(); Preconditions.checkNotNull(coordinator, "Coordinator can't be null"); - coordinator.join(connectContext.getSessionVariable().getQueryTimeoutS()); + coordinator.join(stmtExecutor.getExecTimeout()); if (coordinator.isDone()) { metrics = stmtExecutor.getCoordinator().getDataCacheSelectMetrics(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 4306db37833af..b32488b79bf4f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -66,6 +66,7 @@ import com.starrocks.sql.analyzer.Authorizer; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.CleanTemporaryTableStmt; +import com.starrocks.sql.ast.DmlStmt; import com.starrocks.sql.ast.SetListItem; import com.starrocks.sql.ast.SetStmt; import com.starrocks.sql.ast.SetType; @@ -947,6 +948,18 @@ public void kill(boolean killConnection, String cancelledMessage) { } } + public int getExecTimeout() { + return executor != null ? executor.getExecTimeout() : sessionVariable.getQueryTimeoutS(); + } + + private String getExecType() { + return executor != null ? executor.getExecType() : "Query"; + } + + private boolean isExecLoadType() { + return executor != null && executor.getParsedStmt() instanceof DmlStmt; + } + public void checkTimeout(long now) { long startTimeMillis = getStartTime(); if (startTimeMillis <= 0) { @@ -960,27 +973,35 @@ public void checkTimeout(long now) { if (executor != null) { sql = executor.getOriginStmtInString(); } + String errMsg = ""; if (command == MysqlCommand.COM_SLEEP) { - if (delta > sessionVariable.getWaitTimeoutS() * 1000L) { + int waitTimeout = sessionVariable.getWaitTimeoutS(); + if (delta > waitTimeout * 1000L) { // Need kill this connection. LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}, query id: {}, sql: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS(), queryId, sql); + getMysqlChannel().getRemoteHostPortString(), waitTimeout, queryId, sql); killFlag = true; killConnection = true; + + errMsg = String.format("Connection reached its wait timeout of %d seconds", waitTimeout); } } else { - long timeoutSecond = sessionVariable.getQueryTimeoutS(); + long timeoutSecond = getExecTimeout(); if (delta > timeoutSecond * 1000L) { - LOG.warn("kill query timeout, remote: {}, query timeout: {}, query id: {}, sql: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS(), queryId, sql); + LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}, sql: {}", getExecType(), + getMysqlChannel().getRemoteHostPortString(), timeoutSecond, queryId, sql); // Only kill killFlag = true; + + String suggestedMsg = String.format("please increase the '%s' session variable", + isExecLoadType() ? "insert_timeout" : "query_timeout"); + errMsg = ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeoutSecond, suggestedMsg); } } if (killFlag) { - kill(killConnection, "query timeout"); + kill(killConnection, errMsg); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java index 9eb3202c66481..2cb88a9ae57f3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/LeaderOpExecutor.java @@ -97,15 +97,15 @@ public LeaderOpExecutor(Pair ipAndPort, StatementBase parsedStm this.originStmt = originStmt; this.ctx = ctx; if (status.isNeedToWaitJournalSync()) { - this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + this.waitTimeoutMs = ctx.getExecTimeout() * 1000; } else { this.waitTimeoutMs = 0; } - // set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms + // set thriftTimeoutMs to exec timeout + thrift_rpc_timeout_ms // so that we can return an execution timeout instead of a network timeout - this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms; + this.thriftTimeoutMs = ctx.getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; if (this.thriftTimeoutMs < 0) { - this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000; + this.thriftTimeoutMs = ctx.getExecTimeout() * 1000; } this.parsedStmt = parsedStmt; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java index c453ffb198e3a..7b7367f5065ed 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java @@ -34,6 +34,7 @@ package com.starrocks.qe; +import com.starrocks.common.ErrorCode; import com.starrocks.common.Status; import com.starrocks.common.util.DebugUtil; import com.starrocks.proto.PFetchDataResult; @@ -61,11 +62,11 @@ public class ResultReceiver { private volatile boolean isDone = false; private volatile boolean isCancel = false; private long packetIdx = 0; - private final long timeoutTs; + private final int timeoutMs; + private final long deadlineMs; private final TNetworkAddress address; private final PUniqueId finstId; private final Long backendId; - private Thread currentThread; public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) { this.finstId = new PUniqueId(); @@ -73,7 +74,8 @@ public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, in this.finstId.lo = tid.lo; this.backendId = backendId; this.address = address; - this.timeoutTs = System.currentTimeMillis() + timeoutMs; + this.timeoutMs = timeoutMs; + this.deadlineMs = System.currentTimeMillis() + timeoutMs; } public RowBatch getNext(Status status) throws TException { @@ -89,11 +91,11 @@ public RowBatch getNext(Status status) throws TException { PFetchDataResult pResult = null; while (pResult == null) { long currentTs = System.currentTimeMillis(); - if (currentTs >= timeoutTs) { + if (currentTs >= deadlineMs) { throw new TimeoutException("query timeout"); } try { - pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + pResult = future.get(deadlineMs - currentTs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // continue to get result LOG.info("future get interrupted Exception"); @@ -138,17 +140,16 @@ public RowBatch getNext(Status status) throws TException { LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.setStatus(new Status(TStatusCode.TIMEOUT, - String.format("Query exceeded time limit of %d seconds", - ConnectContext.get().getSessionVariable().getQueryTimeoutS()))); + status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg( + "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable and retry"))); } else { status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlocklist(backendId); } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.setTimeOutStatus(String.format("Query exceeded time limit of %d seconds", - ConnectContext.get().getSessionVariable().getQueryTimeoutS())); + status.setTimeOutStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg( + "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable and retry")); } if (isCancel) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 397b96570eeeb..f6fc442f8ef61 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num"; public static final String ENABLE_INSERT_STRICT = "enable_insert_strict"; public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio"; + public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite"; public static final String ENABLE_SPILL = "enable_spill"; public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage"; @@ -1168,6 +1169,9 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO) private double insertMaxFilterRatio = 0; + @VariableMgr.VarAttr(name = INSERT_TIMEOUT) + private int insertTimeoutS = 14400; + @VariableMgr.VarAttr(name = ENABLE_SPILL) private boolean enableSpill = false; @@ -2864,6 +2868,14 @@ public void setInsertMaxFilterRatio(double insertMaxFilterRatio) { this.insertMaxFilterRatio = insertMaxFilterRatio; } + public int getInsertTimeoutS() { + return insertTimeoutS; + } + + public void setInsertTimeoutS(int insertTimeoutS) { + this.insertTimeoutS = insertTimeoutS; + } + public boolean isEnableSpill() { return enableSpill; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java index 298429a4055b5..2c1597fd3f21a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ShortCircuitHybridExecutor.java @@ -113,8 +113,7 @@ public void exec() throws UserException { if (null == future) { return; } - PExecShortCircuitResult shortCircuitResult = future.get( - context.getSessionVariable().getQueryTimeoutS(), TimeUnit.SECONDS); + PExecShortCircuitResult shortCircuitResult = future.get(context.getExecTimeout(), TimeUnit.SECONDS); watch.stop(); long t = watch.elapsed().toMillis(); MetricRepo.HISTO_SHORTCIRCUIT_RPC_LATENCY.update(t); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 24972fe959401..8307f4377f2f6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -1371,7 +1371,8 @@ private void handleAnalyzeStmt() throws IOException { analyzeStatus.setStatus(StatsConstants.ScheduleStatus.PENDING); GlobalStateMgr.getCurrentState().getAnalyzeMgr().replayAddAnalyzeStatus(analyzeStatus); - int timeout = context.getSessionVariable().getQueryTimeoutS(); + int queryTimeout = context.getSessionVariable().getQueryTimeoutS(); + int insertTimeout = context.getSessionVariable().getInsertTimeoutS(); try { Future future = GlobalStateMgr.getCurrentState().getAnalyzeMgr().getAnalyzeTaskThreadPool() .submit(() -> executeAnalyze(analyzeStmt, analyzeStatus, db, table)); @@ -1381,6 +1382,7 @@ private void handleAnalyzeStmt() throws IOException { // will print warning log if timeout, so we update timeout temporarily to avoid // warning log context.getSessionVariable().setQueryTimeoutS((int) Config.statistic_collect_query_timeout); + context.getSessionVariable().setInsertTimeoutS((int) Config.statistic_collect_query_timeout); future.get(); } } catch (RejectedExecutionException e) { @@ -1394,7 +1396,8 @@ private void handleAnalyzeStmt() throws IOException { LOG.warn("analyze statement failed {}", analyzeStmt.toString(), e); GlobalStateMgr.getCurrentState().getAnalyzeMgr().addAnalyzeStatus(analyzeStatus); } finally { - context.getSessionVariable().setQueryTimeoutS(timeout); + context.getSessionVariable().setQueryTimeoutS(queryTimeout); + context.getSessionVariable().setInsertTimeoutS(insertTimeout); } ShowResultSet resultSet = analyzeStatus.toShowResult(); @@ -2282,7 +2285,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { estimateFileNum, estimateScanFileSize, type, - ConnectContext.get().getSessionVariable().getQueryTimeoutS(), + getExecTimeout(), coord); loadJob.setJobProperties(stmt.getProperties()); jobId = loadJob.getId(); @@ -2307,8 +2310,9 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { coord.setTopProfileSupplier(this::buildTopLevelProfile); coord.setExecPlan(execPlan); - long jobDeadLineMs = System.currentTimeMillis() + context.getSessionVariable().getQueryTimeoutS() * 1000; - coord.join(context.getSessionVariable().getQueryTimeoutS()); + int timeout = getExecTimeout(); + long jobDeadLineMs = System.currentTimeMillis() + timeout * 1000; + coord.join(timeout); if (!coord.isDone()) { /* * In this case, There are two factors that lead query cancelled: @@ -2329,16 +2333,16 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { coord.cancel(ErrorCode.ERR_QUERY_EXCEPTION.formatErrorMsg()); ErrorReport.reportNoAliveBackendException(ErrorCode.ERR_QUERY_EXCEPTION); } else { - coord.cancel(ErrorCode.ERR_QUERY_TIMEOUT.formatErrorMsg()); + coord.cancel(ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeout, "")); if (coord.isThriftServerHighLoad()) { - ErrorReport.reportTimeoutException(ErrorCode.ERR_QUERY_TIMEOUT, + ErrorReport.reportTimeoutException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, "Please check the thrift-server-pool metrics, " + "if the pool size reaches thrift_server_max_worker_threads(default is 4096), " + "you can set the config to a higher value in fe.conf, " + "or set parallel_fragment_exec_instance_num to a lower value in session variable"); } else { - ErrorReport.reportTimeoutException(ErrorCode.ERR_QUERY_TIMEOUT, - "Increase the query_timeout session variable and retry"); + ErrorReport.reportTimeoutException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, + "please increase the 'insert_timeout' session variable and retry"); } } } @@ -2812,4 +2816,28 @@ public void addFinishedQueryDetail() { QueryDetailQueue.addQueryDetail(queryDetail); } + public int getExecTimeout() { + if (parsedStmt instanceof DmlStmt) { + int timeout = ((DmlStmt) parsedStmt).getTimeout(); + return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); + } + if (parsedStmt instanceof CreateTableAsSelectStmt) { + InsertStmt insertStmt = ((CreateTableAsSelectStmt) parsedStmt).getInsertStmt(); + int timeout = insertStmt.getTimeout(); + return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); + } + return context.getSessionVariable().getQueryTimeoutS(); + } + + public String getExecType() { + if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) { + return "Insert"; + } else if (parsedStmt instanceof UpdateStmt) { + return "Update"; + } else if (parsedStmt instanceof DeleteStmt) { + return "Delete"; + } else { + return "Query"; + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 5bffed0780964..58a7a6fbf5e5e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -106,6 +106,7 @@ public static JobSpec fromQuerySpec(ConnectContext context, TQueryType queryType) { TQueryOptions queryOptions = context.getSessionVariable().toThrift(); queryOptions.setQuery_type(queryType); + queryOptions.setQuery_timeout(context.getExecTimeout()); TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTimeInstant(), context.getSessionVariable().getTimeZone()); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 8085149e25561..9a15bcea276d7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -582,7 +582,7 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { // change `query_timeout` to 1 hour by default for better user experience. if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) { - mvSessionVariable.setQueryTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); + mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); } // set insert_max_filter_ratio by default diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java index edf32f3da781c..219f28505a0aa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVMaintenanceJob.java @@ -267,7 +267,7 @@ void buildPhysicalTopology() throws Exception { // NOTE use a fake transaction id, the real one would be generated when epoch started long fakeTransactionId = 1; long dbId = getView().getDbId(); - long timeout = context.getSessionVariable().getQueryTimeoutS(); + long timeout = context.getExecTimeout(); dataSink.init(context.getExecutionId(), fakeTransactionId, dbId, timeout); dataSink.complete(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index ff22460cdfa11..b0857b900c414 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -2381,7 +2381,7 @@ public Future refreshOtherFesTable(TNetworkAddress thriftAddress, Table if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable() == null) { timeout = Config.thrift_rpc_timeout_ms * 10; } else { - timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms; + timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; } FutureTask task = new FutureTask(() -> { diff --git a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java index d421dde75df21..562970a04dfc3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/NodeMgr.java @@ -1134,8 +1134,7 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException { setFrontendConfig(stmt.getConfig().getMap()); List allFrontends = getFrontends(null); - int timeout = ConnectContext.get().getSessionVariable().getQueryTimeoutS() * 1000 - + Config.thrift_rpc_timeout_ms; + int timeout = ConnectContext.get().getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms; StringBuilder errMsg = new StringBuilder(); for (Frontend fe : allFrontends) { if (fe.getHost().equals(getSelfNode().first)) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java index 953881c7f5650..a19d07113e477 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/DeletePlanner.java @@ -120,8 +120,7 @@ public ExecPlan plan(DeleteStmt deleteStatement, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), deleteStatement.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), deleteStatement.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java index 8b8df97436bab..a0d948d5cbbc9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java @@ -400,8 +400,7 @@ public ExecPlan plan(InsertStmt insertStmt, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), insertStmt.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), insertStmt.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java index c2d68533f3185..be97738d2885f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/StatementPlanner.java @@ -537,7 +537,7 @@ private static void beginTransaction(DmlStmt stmt, ConnectContext session) Lists.newArrayList(tbl.getSourceTableId()), label, sourceType, - session.getSessionVariable().getQueryTimeoutS(), + session.getExecTimeout(), tbl.getSourceTableHost(), tbl.getSourceTablePort(), authenticateParams); @@ -554,7 +554,7 @@ private static void beginTransaction(DmlStmt stmt, ConnectContext session) new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, - session.getSessionVariable().getQueryTimeoutS(), + session.getExecTimeout(), warehouseId); // add table indexes to transaction state diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java b/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java index 72b0e86156b99..5cbe46a050a34 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/UpdatePlanner.java @@ -155,8 +155,7 @@ public ExecPlan plan(UpdateStmt updateStmt, ConnectContext session) { Database db = GlobalStateMgr.getCurrentState().getMetadataMgr().getDb(catalogDbTable.getCatalog(), catalogDbTable.getDb()); try { - olapTableSink.init(session.getExecutionId(), updateStmt.getTxnId(), db.getId(), - ConnectContext.get().getSessionVariable().getQueryTimeoutS()); + olapTableSink.init(session.getExecutionId(), updateStmt.getTxnId(), db.getId(), session.getExecTimeout()); olapTableSink.complete(); } catch (UserException e) { throw new SemanticException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java index 27b3913f28716..983b2f0d6d52b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/DeleteAnalyzer.java @@ -190,6 +190,7 @@ private static void analyzeProperties(DeleteStmt deleteStmt, ConnectContext sess properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); } public static void analyze(DeleteStmt deleteStatement, ConnectContext session) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java index 2fa0e60347961..332cb6aead7c7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/InsertAnalyzer.java @@ -357,15 +357,17 @@ private static void analyzeProperties(InsertStmt insertStmt, ConnectContext sess } // check common properties - // use session variable if not set max_filter_ratio property + // use session variable if not set max_filter_ratio, strict_mode, timeout property if (!properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) { properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); } - // use session variable if not set strict_mode property if (!properties.containsKey(LoadStmt.STRICT_MODE)) { properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); } + if (!properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); + } try { LoadStmt.checkProperties(properties); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java index 5f24949478cca..3254ecf2b2072 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/UpdateAnalyzer.java @@ -65,6 +65,7 @@ private static void analyzeProperties(UpdateStmt updateStmt, ConnectContext sess properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); + properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(session.getSessionVariable().getInsertTimeoutS())); } public static void analyze(UpdateStmt updateStmt, ConnectContext session) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java index 3e5753da1da07..261361133af1c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/DmlStmt.java @@ -61,7 +61,17 @@ public double getMaxFilterRatio() { // ignore } } - return ConnectContext.get().getSessionVariable().getInsertMaxFilterRatio(); } + + public int getTimeout() { + if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + try { + return Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + } catch (NumberFormatException e) { + // ignore + } + } + return ConnectContext.get().getSessionVariable().getInsertTimeoutS(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index c88591574c906..53367b301acef 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -103,6 +103,7 @@ public static ConnectContext buildConnectContext() { context.getSessionVariable().setEnableLoadProfile(false); context.getSessionVariable().setParallelExecInstanceNum(1); context.getSessionVariable().setQueryTimeoutS((int) Config.statistic_collect_query_timeout); + context.getSessionVariable().setInsertTimeoutS((int) Config.statistic_collect_query_timeout); context.getSessionVariable().setEnablePipelineEngine(true); context.getSessionVariable().setCboCteReuse(true); context.getSessionVariable().setCboCTERuseRatio(0); diff --git a/test/sql/test_insert_empty/R/test_insert_timeout b/test/sql/test_insert_empty/R/test_insert_timeout new file mode 100644 index 0000000000000..810d83db547ec --- /dev/null +++ b/test/sql/test_insert_empty/R/test_insert_timeout @@ -0,0 +1,63 @@ +-- name: test_insert_timeout + +create database db_${uuid0}; +use db_${uuid0}; + +set query_timeout = 2; +select sleep(4); +-- result: +[REGEX].*Query reached its timeout of 2 seconds, please increase the 'query_timeout' session variable.* +-- !result +set query_timeout = 300; + +create table t1(k1 int); + +set insert_timeout = 2; +insert into t1 select sleep(4); +-- result: +[REGEX].*Insert reached its timeout of 2 seconds, please increase the 'insert_timeout' session variable.* +-- !result + +set insert_timeout = 10; +insert into t1 select sleep(4); +-- result: +-- !result +select * from t1; +-- result: +1 +-- !result +truncate table t1; +-- result: +-- !result + +set insert_timeout = 2; +insert into t1 properties("timeout" = "10") select sleep(4); +-- result: +-- !result +select * from t1; +-- result: +1 +-- !result +truncate table t1; +-- result: +-- !result + +set insert_timeout = 2; +create table t2 as select sleep(4) as k1; +-- result: +[REGEX].*Insert reached its timeout of 2 seconds, please increase the 'insert_timeout' session variable.* +-- !result + +set insert_timeout = 10; +create table t2 as select sleep(4) as k1; +-- result: +-- !result +select * from t2; +-- result: +1 +-- !result +truncate table t2; +-- result: +-- !result + +set insert_timeout = 14400; diff --git a/test/sql/test_insert_empty/T/test_insert_timeout b/test/sql/test_insert_empty/T/test_insert_timeout new file mode 100644 index 0000000000000..5f933a354db59 --- /dev/null +++ b/test/sql/test_insert_empty/T/test_insert_timeout @@ -0,0 +1,33 @@ +-- name: test_insert_timeout + +create database db_${uuid0}; +use db_${uuid0}; + +set query_timeout = 2; +select sleep(4); +set query_timeout = 300; + +create table t1(k1 int); + +set insert_timeout = 2; +insert into t1 select sleep(4); + +set insert_timeout = 10; +insert into t1 select sleep(4); +select * from t1; +truncate table t1; + +set insert_timeout = 2; +insert into t1 properties("timeout" = "10") select sleep(4); +select * from t1; +truncate table t1; + +set insert_timeout = 2; +create table t2 as select sleep(4) as k1; + +set insert_timeout = 10; +create table t2 as select sleep(4) as k1; +select * from t2; +truncate table t2; + +set insert_timeout = 14400; From 2049f2a68c613a058aa6f201957ea236164dd67d Mon Sep 17 00:00:00 2001 From: wyb Date: Tue, 10 Sep 2024 17:00:19 +0800 Subject: [PATCH 2/8] Fix comment Signed-off-by: wyb --- .../java/com/starrocks/qe/ConnectContext.java | 3 +- .../java/com/starrocks/qe/StmtExecutor.java | 45 +++++++++---------- .../sql/ast/CreateTableAsSelectStmt.java | 5 +++ .../com/starrocks/sql/ast/StatementBase.java | 5 +++ 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index b32488b79bf4f..56b2238e226a0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -66,7 +66,6 @@ import com.starrocks.sql.analyzer.Authorizer; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.CleanTemporaryTableStmt; -import com.starrocks.sql.ast.DmlStmt; import com.starrocks.sql.ast.SetListItem; import com.starrocks.sql.ast.SetStmt; import com.starrocks.sql.ast.SetType; @@ -957,7 +956,7 @@ private String getExecType() { } private boolean isExecLoadType() { - return executor != null && executor.getParsedStmt() instanceof DmlStmt; + return executor != null && executor.isExecLoadType(); } public void checkTimeout(long now) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 8307f4377f2f6..12516506b8a9f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -470,6 +470,26 @@ public StatementBase getParsedStmt() { return parsedStmt; } + public int getExecTimeout() { + return parsedStmt.getTimeout(); + } + + public String getExecType() { + if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) { + return "Insert"; + } else if (parsedStmt instanceof UpdateStmt) { + return "Update"; + } else if (parsedStmt instanceof DeleteStmt) { + return "Delete"; + } else { + return "Query"; + } + } + + public boolean isExecLoadType() { + return parsedStmt instanceof DmlStmt || parsedStmt instanceof CreateTableAsSelectStmt; + } + // Execute one statement. // Exception: // IOException: talk with client failed. @@ -2815,29 +2835,4 @@ public void addFinishedQueryDetail() { QueryDetailQueue.addQueryDetail(queryDetail); } - - public int getExecTimeout() { - if (parsedStmt instanceof DmlStmt) { - int timeout = ((DmlStmt) parsedStmt).getTimeout(); - return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); - } - if (parsedStmt instanceof CreateTableAsSelectStmt) { - InsertStmt insertStmt = ((CreateTableAsSelectStmt) parsedStmt).getInsertStmt(); - int timeout = insertStmt.getTimeout(); - return timeout != -1 ? timeout : context.getSessionVariable().getInsertTimeoutS(); - } - return context.getSessionVariable().getQueryTimeoutS(); - } - - public String getExecType() { - if (parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) { - return "Insert"; - } else if (parsedStmt instanceof UpdateStmt) { - return "Update"; - } else if (parsedStmt instanceof DeleteStmt) { - return "Delete"; - } else { - return "Query"; - } - } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CreateTableAsSelectStmt.java index 4da1e00f2b201..855bd80f9a88a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/CreateTableAsSelectStmt.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/CreateTableAsSelectStmt.java @@ -78,4 +78,9 @@ public String toSql() { public R accept(AstVisitor visitor, C context) { return visitor.visitCreateTableAsSelectStatement(this, context); } + + @Override + public int getTimeout() { + return insertStmt.getTimeout(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/StatementBase.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/StatementBase.java index 703b4860e1b89..1861ee2bd2642 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/StatementBase.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/StatementBase.java @@ -39,6 +39,7 @@ import com.starrocks.analysis.ParseNode; import com.starrocks.analysis.RedirectStatus; import com.starrocks.common.profile.Tracers; +import com.starrocks.qe.ConnectContext; import com.starrocks.qe.OriginStatement; import com.starrocks.sql.parser.NodePosition; import org.apache.commons.collections4.CollectionUtils; @@ -169,4 +170,8 @@ public void setAllQueryScopeHints(List hintNodes) { public boolean isExistQueryScopeHint() { return CollectionUtils.isNotEmpty(allQueryScopeHints); } + + public int getTimeout() { + return ConnectContext.get().getSessionVariable().getQueryTimeoutS(); + } } From 4d253fa93d7bd048fba73441de73c78680a3ebf1 Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 20 Sep 2024 13:30:27 +0800 Subject: [PATCH 3/8] Add ut Signed-off-by: wyb --- .../java/com/starrocks/qe/ConnectContext.java | 2 +- .../java/com/starrocks/qe/ResultReceiver.java | 8 ++-- .../java/com/starrocks/qe/StmtExecutor.java | 2 +- .../com/starrocks/qe/StmtExecutorTest.java | 38 +++++++++++++++++++ 4 files changed, 44 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 56b2238e226a0..2e273ba268683 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -995,7 +995,7 @@ public void checkTimeout(long now) { killFlag = true; String suggestedMsg = String.format("please increase the '%s' session variable", - isExecLoadType() ? "insert_timeout" : "query_timeout"); + isExecLoadType() ? SessionVariable.INSERT_TIMEOUT : SessionVariable.QUERY_TIMEOUT); errMsg = ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeoutSecond, suggestedMsg); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java index 7b7367f5065ed..12e92118e4723 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java @@ -140,16 +140,16 @@ public RowBatch getNext(Status status) throws TException { LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. - status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg( - "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable and retry"))); + status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg("Query", timeoutMs / 1000, + String.format("please increase the '%s' session variable and retry", SessionVariable.QUERY_TIMEOUT)))); } else { status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlocklist(backendId); } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.setTimeOutStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg( - "Query", timeoutMs / 1000, "please increase the 'query_timeout' session variable and retry")); + status.setTimeOutStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg("Query", timeoutMs / 1000, + String.format("please increase the '%s' session variable and retry", SessionVariable.QUERY_TIMEOUT))); } if (isCancel) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 12516506b8a9f..aa449786cb351 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -2362,7 +2362,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { "or set parallel_fragment_exec_instance_num to a lower value in session variable"); } else { ErrorReport.reportTimeoutException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, - "please increase the 'insert_timeout' session variable and retry"); + String.format("please increase the '%s' session variable", SessionVariable.INSERT_TIMEOUT)); } } } diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/StmtExecutorTest.java index 762a9c9273777..0556a22a1fe1a 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/StmtExecutorTest.java @@ -15,8 +15,10 @@ package com.starrocks.qe; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.ast.StatementBase; import com.starrocks.sql.parser.AstBuilder; import com.starrocks.sql.parser.SqlParser; +import com.starrocks.utframe.UtFrameUtils; import mockit.Expectations; import mockit.Mocked; import org.junit.Assert; @@ -49,4 +51,40 @@ public void testIsForwardToLeader(@Mocked GlobalStateMgr state) { Assert.assertFalse(new StmtExecutor(new ConnectContext(), SqlParser.parseSingleStatement("show frontends", SqlModeHelper.MODE_DEFAULT)).isForwardToLeader()); } + + @Test + public void testExecType() { + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + ConnectContext.threadLocalInfo.set(ctx); + + StatementBase stmt = SqlParser.parseSingleStatement("select * from t1", SqlModeHelper.MODE_DEFAULT); + StmtExecutor executor = new StmtExecutor(new ConnectContext(), stmt); + Assert.assertEquals("Query", executor.getExecType()); + Assert.assertFalse(executor.isExecLoadType()); + Assert.assertEquals(ConnectContext.get().getSessionVariable().getQueryTimeoutS(), executor.getExecTimeout()); + + stmt = SqlParser.parseSingleStatement("insert into t1 select * from t2", SqlModeHelper.MODE_DEFAULT); + executor = new StmtExecutor(new ConnectContext(), stmt); + Assert.assertEquals("Insert", executor.getExecType()); + Assert.assertTrue(executor.isExecLoadType()); + Assert.assertEquals(ConnectContext.get().getSessionVariable().getInsertTimeoutS(), executor.getExecTimeout()); + + stmt = SqlParser.parseSingleStatement("create table t1 as select * from t2", SqlModeHelper.MODE_DEFAULT); + executor = new StmtExecutor(new ConnectContext(), stmt); + Assert.assertEquals("Insert", executor.getExecType()); + Assert.assertTrue(executor.isExecLoadType()); + Assert.assertEquals(ConnectContext.get().getSessionVariable().getInsertTimeoutS(), executor.getExecTimeout()); + + stmt = SqlParser.parseSingleStatement("update t1 set k1 = 1 where k2 = 1", SqlModeHelper.MODE_DEFAULT); + executor = new StmtExecutor(new ConnectContext(), stmt); + Assert.assertEquals("Update", executor.getExecType()); + Assert.assertTrue(executor.isExecLoadType()); + Assert.assertEquals(ConnectContext.get().getSessionVariable().getInsertTimeoutS(), executor.getExecTimeout()); + + stmt = SqlParser.parseSingleStatement("delete from t1 where k2 = 1", SqlModeHelper.MODE_DEFAULT); + executor = new StmtExecutor(new ConnectContext(), stmt); + Assert.assertEquals("Delete", executor.getExecType()); + Assert.assertTrue(executor.isExecLoadType()); + Assert.assertEquals(ConnectContext.get().getSessionVariable().getInsertTimeoutS(), executor.getExecTimeout()); + } } From 16d9a8567da330980aebe3d5e94cb04c716a5a9b Mon Sep 17 00:00:00 2001 From: wyb Date: Wed, 6 Nov 2024 01:01:50 +0800 Subject: [PATCH 4/8] Update mv insert timeout and improve some error codes Signed-off-by: wyb --- .../starrocks/alter/AlterMVJobExecutor.java | 2 +- .../com/starrocks/alter/OptimizeJobV2.java | 2 +- .../java/com/starrocks/common/ErrorCode.java | 4 +-- .../common/util/PropertyAnalyzer.java | 2 +- .../java/com/starrocks/load/pipe/Pipe.java | 3 ++- .../PartitionBasedMvRefreshProcessor.java | 25 +++++++++++++------ .../sql/analyzer/SetStmtAnalyzer.java | 5 ++++ .../starrocks/load/pipe/PipeManagerTest.java | 8 +++--- .../R/test_show_materialized_view | 8 +++--- .../T/test_show_materialized_view | 4 +-- 10 files changed, 39 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/AlterMVJobExecutor.java b/fe/fe-core/src/main/java/com/starrocks/alter/AlterMVJobExecutor.java index 2f7e0b3aee1f8..75aa04ef5e2f0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/AlterMVJobExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/AlterMVJobExecutor.java @@ -193,7 +193,7 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT if (!entry.getKey().startsWith(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX)) { throw new SemanticException("Modify failed because unknown properties: " + properties + ", please add `session.` prefix if you want add session variables for mv(" + - "eg, \"session.query_timeout\"=\"30000000\")."); + "eg, \"session.insert_timeout\"=\"30000000\")."); } String varKey = entry.getKey().substring(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length()); SystemVariable variable = new SystemVariable(varKey, new StringLiteral(entry.getValue())); diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/OptimizeJobV2.java b/fe/fe-core/src/main/java/com/starrocks/alter/OptimizeJobV2.java index 39357379ad96b..497d45fba6ea1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/OptimizeJobV2.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/OptimizeJobV2.java @@ -279,7 +279,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { rewriteTask.setTempPartitionName(tmpPartitionName); rewriteTask.setLastVersion(partitionLastVersion.get(i)); // use half of the alter timeout as rewrite task timeout - rewriteTask.getProperties().put(SessionVariable.QUERY_TIMEOUT, String.valueOf(timeoutMs / 2000)); + rewriteTask.getProperties().put(SessionVariable.INSERT_TIMEOUT, String.valueOf(timeoutMs / 2000)); rewriteTasks.add(rewriteTask); } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java index 0f769cf1c276e..20494ac446dce 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java @@ -319,7 +319,7 @@ public enum ErrorCode { "No partitions have data available for loading. If you are sure there may be no data to be loaded, " + "you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " + "to ensure such load jobs can succeed"), - ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'}, + ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'4', '2', '6', '0', '1'}, "Inserted target column count: %d doesn't match select/value column count: %d"), ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"), ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'}, @@ -328,7 +328,7 @@ public enum ErrorCode { ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'}, "Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " + "You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"), - ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"), + ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'4', '2', '6', '0', '1'}, "%s column: %s has no matching %s column"), /** * 5700 - 5799: Partition diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java index 7546a182b09e6..813a3d80fa45e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/PropertyAnalyzer.java @@ -1655,7 +1655,7 @@ private static SystemVariable getMVSystemVariable(Map properties throw new AnalysisException("Analyze materialized properties failed " + "because unknown properties: " + properties + ", please add `session.` prefix if you want add session variables for mv(" + - "eg, \"session.query_timeout\"=\"30000000\")."); + "eg, \"session.insert_timeout\"=\"30000000\")."); } String varKey = entry.getKey().substring( PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length()); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java index 834a1fd543350..b3671779230cf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java @@ -36,6 +36,7 @@ import com.starrocks.load.pipe.filelist.FileListRepo; import com.starrocks.persist.gson.GsonPostProcessable; import com.starrocks.persist.gson.GsonUtils; +import com.starrocks.qe.SessionVariable; import com.starrocks.scheduler.Constants; import com.starrocks.scheduler.ExecuteOption; import com.starrocks.scheduler.SubmitResult; @@ -83,7 +84,7 @@ public class Pipe implements GsonPostProcessable { private static final ImmutableMap DEFAULT_TASK_EXECUTION_VARIABLES = ImmutableMap.builder() - .put("query_timeout", "3600") + .put(SessionVariable.INSERT_TIMEOUT, "3600") .build(); @SerializedName(value = "name") diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 9a15bcea276d7..58113bb191188 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -115,13 +115,16 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor { private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); // session.enable_spill - public static final String MV_SESSION_ENABLE_SPILL = + private static final String MV_SESSION_ENABLE_SPILL = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.ENABLE_SPILL; - // session.query_timeout - public static final String MV_SESSION_TIMEOUT = + // session.query_timeout. Deprecated, only for compatibility with old version + private static final String MV_SESSION_QUERY_TIMEOUT = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.QUERY_TIMEOUT; - // default query timeout for mv: 1 hour - private static final int MV_DEFAULT_QUERY_TIMEOUT = 3600; + // session.insert_timeout + private static final String MV_SESSION_INSERT_TIMEOUT = + PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.INSERT_TIMEOUT; + // default insert timeout for mv: 1 hour + private static final int MV_DEFAULT_INSERT_TIMEOUT = 3600; private Database db; @@ -580,9 +583,15 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { mvSessionVariable.setEnableSpill(true); } - // change `query_timeout` to 1 hour by default for better user experience. - if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) { - mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_QUERY_TIMEOUT); + if (!mvProperty.getProperties().containsKey(MV_SESSION_INSERT_TIMEOUT)) { + if (!mvProperty.getProperties().containsKey(MV_SESSION_QUERY_TIMEOUT)) { + // change `insert_timeout` to 1 hour by default for better user experience. + mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_INSERT_TIMEOUT); + } else { + // for compatibility + mvProperty.getProperties().put(MV_SESSION_INSERT_TIMEOUT, + mvProperty.getProperties().remove(MV_SESSION_QUERY_TIMEOUT)); + } } // set insert_max_filter_ratio by default diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/SetStmtAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/SetStmtAnalyzer.java index 0c88d5c602b93..60cff3b8e235d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/SetStmtAnalyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/SetStmtAnalyzer.java @@ -160,6 +160,11 @@ private static void analyzeSystemVariable(SystemVariable var) { 1L, (long) SessionVariable.MAX_QUERY_TIMEOUT); } + if (variable.equalsIgnoreCase(SessionVariable.INSERT_TIMEOUT)) { + checkRangeLongVariable(resolvedExpression, SessionVariable.INSERT_TIMEOUT, + 1L, (long) SessionVariable.MAX_QUERY_TIMEOUT); + } + if (variable.equalsIgnoreCase(SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT)) { checkRangeLongVariable(resolvedExpression, SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT, 1L, null); } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java index a798131df1cdd..353a5a20fe4a8 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java @@ -957,18 +957,18 @@ public void testProperty() throws Exception { public void testTaskProperties() throws Exception { mockRepoExecutor(); String pipeName = "p_task_properties"; - createPipe("create pipe p_task_properties properties('task.query_timeout'='20') " + + createPipe("create pipe p_task_properties properties('task.insert_timeout'='20') " + " as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')"); Pipe pipe = getPipe(pipeName); - Assert.assertEquals("{\"task.query_timeout\":\"20\"}", pipe.getPropertiesJson()); - Assert.assertEquals(ImmutableMap.of("query_timeout", "20"), pipe.getTaskProperties()); + Assert.assertEquals("{\"task.insert_timeout\":\"20\"}", pipe.getPropertiesJson()); + Assert.assertEquals(ImmutableMap.of("insert_timeout", "20"), pipe.getTaskProperties()); dropPipe(pipeName); // default task execution variables createPipe("create pipe p_task_properties " + " as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')"); pipe = getPipe(pipeName); - Assert.assertEquals(ImmutableMap.of("query_timeout", "3600"), pipe.getTaskProperties()); + Assert.assertEquals(ImmutableMap.of("insert_timeout", "3600"), pipe.getTaskProperties()); } @Test diff --git a/test/sql/test_materialized_view/R/test_show_materialized_view b/test/sql/test_materialized_view/R/test_show_materialized_view index 7e7db3ddfe9f2..68394a2ecc52c 100644 --- a/test/sql/test_materialized_view/R/test_show_materialized_view +++ b/test/sql/test_materialized_view/R/test_show_materialized_view @@ -42,7 +42,7 @@ AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) A FROM `test_show_materialized_view`.`user_tags` GROUP BY `user_tags`.`user_id`; -- !result -alter materialized view user_tags_mv1 set ("session.query_timeout" = "3600"); +alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600"); -- result: -- !result alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600"); @@ -57,7 +57,7 @@ PROPERTIES ( "replicated_storage" = "true", "mv_rewrite_staleness_second" = "3600", "replication_num" = "1", -"session.query_timeout" = "3600", +"session.insert_timeout" = "3600", "storage_medium" = "HDD" ) AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) AS `bitmap_union(to_bitmap(tag_id))` @@ -73,7 +73,7 @@ PROPERTIES ( "replicated_storage" = "true", "mv_rewrite_staleness_second" = "3600", "replication_num" = "1", -"session.query_timeout" = "3600", +"session.insert_timeout" = "3600", "storage_medium" = "HDD" ) AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) AS `bitmap_union(to_bitmap(tag_id))` @@ -82,4 +82,4 @@ GROUP BY `user_tags`.`user_id`; -- !result drop database test_show_materialized_view; -- result: --- !result \ No newline at end of file +-- !result diff --git a/test/sql/test_materialized_view/T/test_show_materialized_view b/test/sql/test_materialized_view/T/test_show_materialized_view index 11446272f0e8c..d6d48d83ed856 100644 --- a/test/sql/test_materialized_view/T/test_show_materialized_view +++ b/test/sql/test_materialized_view/T/test_show_materialized_view @@ -7,8 +7,8 @@ create materialized view user_tags_mv1 distributed by hash(user_id) as select u show create materialized view user_tags_mv1; show create table user_tags_mv1; -alter materialized view user_tags_mv1 set ("session.query_timeout" = "3600"); +alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600"); alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600"); show create materialized view user_tags_mv1; show create table user_tags_mv1; -drop database test_show_materialized_view; \ No newline at end of file +drop database test_show_materialized_view; From bca5f55b4e50349446312b6896e88d31afe8d267 Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 8 Nov 2024 14:36:36 +0800 Subject: [PATCH 5/8] Fix online optimize job and remove pipe default timeout and make mv default timeout as insert_timeout Signed-off-by: wyb --- .../com/starrocks/alter/OnlineOptimizeJobV2.java | 2 +- .../main/java/com/starrocks/load/pipe/Pipe.java | 8 -------- .../PartitionBasedMvRefreshProcessor.java | 16 ++++------------ 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/OnlineOptimizeJobV2.java b/fe/fe-core/src/main/java/com/starrocks/alter/OnlineOptimizeJobV2.java index eb80cdea2ae7d..ce96f776bd731 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/OnlineOptimizeJobV2.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/OnlineOptimizeJobV2.java @@ -783,7 +783,7 @@ protected void executeSql(String sql) throws Exception { SessionVariable sessionVariable = context.getSessionVariable(); sessionVariable.setUsePageCache(false); sessionVariable.setEnableMaterializedViewRewrite(false); - sessionVariable.setQueryTimeoutS((int) timeoutMs / 2000); + sessionVariable.setInsertTimeoutS((int) timeoutMs / 2000); context.setExecutor(executor); context.setQueryId(UUIDUtil.genUUID()); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java index b3671779230cf..b334575a416a5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java @@ -16,7 +16,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; import com.starrocks.analysis.TableName; @@ -36,7 +35,6 @@ import com.starrocks.load.pipe.filelist.FileListRepo; import com.starrocks.persist.gson.GsonPostProcessable; import com.starrocks.persist.gson.GsonUtils; -import com.starrocks.qe.SessionVariable; import com.starrocks.scheduler.Constants; import com.starrocks.scheduler.ExecuteOption; import com.starrocks.scheduler.SubmitResult; @@ -82,11 +80,6 @@ public class Pipe implements GsonPostProcessable { public static final long DEFAULT_BATCH_FILES = 256; public static final int FAILED_TASK_THRESHOLD = 5; - private static final ImmutableMap DEFAULT_TASK_EXECUTION_VARIABLES = - ImmutableMap.builder() - .put(SessionVariable.INSERT_TIMEOUT, "3600") - .build(); - @SerializedName(value = "name") private final String name; @SerializedName(value = "id") @@ -129,7 +122,6 @@ protected Pipe(PipeId id, String name, TableName targetTable, FilePipeSource sou this.createdTime = TimeUtils.getEpochSeconds(); this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); this.taskExecutionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - this.taskExecutionVariables.putAll(DEFAULT_TASK_EXECUTION_VARIABLES); } public static Pipe fromStatement(long id, CreatePipeStmt stmt) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java index 58113bb191188..9126a5cac6241 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessor.java @@ -123,9 +123,6 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor { // session.insert_timeout private static final String MV_SESSION_INSERT_TIMEOUT = PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.INSERT_TIMEOUT; - // default insert timeout for mv: 1 hour - private static final int MV_DEFAULT_INSERT_TIMEOUT = 3600; - private Database db; private MaterializedView materializedView; @@ -583,15 +580,10 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) { mvSessionVariable.setEnableSpill(true); } - if (!mvProperty.getProperties().containsKey(MV_SESSION_INSERT_TIMEOUT)) { - if (!mvProperty.getProperties().containsKey(MV_SESSION_QUERY_TIMEOUT)) { - // change `insert_timeout` to 1 hour by default for better user experience. - mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_INSERT_TIMEOUT); - } else { - // for compatibility - mvProperty.getProperties().put(MV_SESSION_INSERT_TIMEOUT, - mvProperty.getProperties().remove(MV_SESSION_QUERY_TIMEOUT)); - } + if (!mvProperty.getProperties().containsKey(MV_SESSION_INSERT_TIMEOUT) + && mvProperty.getProperties().containsKey(MV_SESSION_QUERY_TIMEOUT)) { + // for compatibility + mvProperty.getProperties().put(MV_SESSION_INSERT_TIMEOUT, mvProperty.getProperties().get(MV_SESSION_QUERY_TIMEOUT)); } // set insert_max_filter_ratio by default From c1a03dd7be4e8b2fb331466dae9b98d4ddcde8b3 Mon Sep 17 00:00:00 2001 From: wyb Date: Fri, 8 Nov 2024 16:13:32 +0800 Subject: [PATCH 6/8] Fix ut Signed-off-by: wyb --- .../src/main/java/com/starrocks/load/pipe/Pipe.java | 12 +++++++----- .../com/starrocks/load/pipe/PipeManagerTest.java | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java index b334575a416a5..69e263edeed8a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java @@ -80,6 +80,8 @@ public class Pipe implements GsonPostProcessable { public static final long DEFAULT_BATCH_FILES = 256; public static final int FAILED_TASK_THRESHOLD = 5; + private static final String TASK_PROPERTY_PREFIX = "task."; + @SerializedName(value = "name") private final String name; @SerializedName(value = "id") @@ -138,9 +140,9 @@ public static Pipe fromStatement(long id, CreatePipeStmt stmt) { public void processProperties(Map properties) { for (Map.Entry entry : properties.entrySet()) { - String key = entry.getKey(); + String key = entry.getKey().toLowerCase(); String value = entry.getValue(); - switch (key.toLowerCase()) { + switch (key) { case PipeAnalyzer.PROPERTY_POLL_INTERVAL: { this.pollIntervalSecond = Integer.parseInt(value); break; @@ -164,11 +166,11 @@ public void processProperties(Map properties) { } default: { // task execution variables - if (key.toUpperCase().startsWith("TASK.")) { - String taskVariable = StringUtils.removeStart(key.toUpperCase(), "TASK."); + if (key.startsWith(TASK_PROPERTY_PREFIX)) { + String taskVariable = StringUtils.removeStart(key, TASK_PROPERTY_PREFIX); this.taskExecutionVariables.put(taskVariable, value); } else { - throw new IllegalArgumentException("unsupported property: " + key); + throw new IllegalArgumentException("unsupported property: " + entry.getKey()); } } } diff --git a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java index 353a5a20fe4a8..fb81ce37a9959 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/pipe/PipeManagerTest.java @@ -813,7 +813,7 @@ public void pipeCRUD() throws Exception { AlterPipeStmt alterStmt = (AlterPipeStmt) UtFrameUtils.parseStmtWithNewParser(sql, ctx); pm.alterPipe(alterStmt); pipe = getPipe("p_crud"); - Assert.assertEquals("{\"auto_ingest\":\"false\",\"BATCH_SIZE\":\"10GB\"}", pipe.getPropertiesJson()); + Assert.assertEquals("{\"auto_ingest\":\"false\",\"batch_size\":\"10GB\"}", pipe.getPropertiesJson()); // drop sql = "drop pipe p_crud"; @@ -968,7 +968,7 @@ public void testTaskProperties() throws Exception { createPipe("create pipe p_task_properties " + " as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')"); pipe = getPipe(pipeName); - Assert.assertEquals(ImmutableMap.of("insert_timeout", "3600"), pipe.getTaskProperties()); + Assert.assertEquals(ImmutableMap.of(), pipe.getTaskProperties()); } @Test From f09e24f3980af3534d07019f16073daba60e397e Mon Sep 17 00:00:00 2001 From: wyb Date: Sun, 10 Nov 2024 12:21:14 +0800 Subject: [PATCH 7/8] Add ut Signed-off-by: wyb --- .../java/com/starrocks/qe/ConnectContext.java | 5 +- .../com/starrocks/qe/ConnectContextTest.java | 51 +++++++++++++++---- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 2e273ba268683..374e49dd22dc1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -988,8 +988,9 @@ public void checkTimeout(long now) { } else { long timeoutSecond = getExecTimeout(); if (delta > timeoutSecond * 1000L) { - LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}, sql: {}", getExecType(), - getMysqlChannel().getRemoteHostPortString(), timeoutSecond, queryId, sql); + LOG.warn("kill timeout {}, remote: {}, execute timeout: {}, query id: {}, sql: {}", + getExecType().toLowerCase(), getMysqlChannel().getRemoteHostPortString(), timeoutSecond, + queryId, sql); // Only kill killFlag = true; diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/ConnectContextTest.java index 4a16908472ef4..6d53e2adc31fe 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/ConnectContextTest.java @@ -34,6 +34,7 @@ package com.starrocks.qe; +import com.starrocks.analysis.TableName; import com.starrocks.common.Status; import com.starrocks.common.util.TimeUtils; import com.starrocks.mysql.MysqlCapability; @@ -41,6 +42,9 @@ import com.starrocks.mysql.MysqlCommand; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.WarehouseManager; +import com.starrocks.sql.ast.InsertStmt; +import com.starrocks.sql.ast.QueryStatement; +import com.starrocks.sql.ast.ValuesRelation; import com.starrocks.thrift.TStatus; import com.starrocks.thrift.TStatusCode; import com.starrocks.thrift.TUniqueId; @@ -58,8 +62,6 @@ public class ConnectContextTest { @Mocked private MysqlChannel channel; @Mocked - private StmtExecutor executor; - @Mocked private SocketChannel socketChannel; @Mocked private GlobalStateMgr globalStateMgr; @@ -83,9 +85,6 @@ public void setUp() throws Exception { minTimes = 0; result = "192.168.1.1"; - executor.cancel("set up"); - minTimes = 0; - globalStateMgr.getVariableMgr(); minTimes = 0; result = variableMgr; @@ -189,7 +188,6 @@ public void testSleepTimeout() { // Timeout ctx.setStartTime(); now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000 + 1; - ctx.setExecutor(executor); ctx.checkTimeout(now); Assert.assertTrue(ctx.isKilled()); @@ -204,18 +202,22 @@ public void testSleepTimeout() { } @Test - public void testOtherTimeout() { + public void testQueryTimeout() { ConnectContext ctx = new ConnectContext(socketChannel); ctx.setCommand(MysqlCommand.COM_QUERY); + ctx.setThreadLocalInfo(); - // sleep no time out + StmtExecutor executor = new StmtExecutor(ctx, new QueryStatement(ValuesRelation.newDualRelation())); + ctx.setExecutor(executor); + + // query no time out Assert.assertFalse(ctx.isKilled()); - long now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 - 1; + long now = ctx.getStartTime() + ctx.getSessionVariable().getQueryTimeoutS() * 1000 - 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); // Timeout - now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + 1; + now = ctx.getStartTime() + ctx.getSessionVariable().getQueryTimeoutS() * 1000 + 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); @@ -227,6 +229,35 @@ public void testOtherTimeout() { ctx.cleanup(); } + @Test + public void testInsertTimeout() { + ConnectContext ctx = new ConnectContext(socketChannel); + ctx.setCommand(MysqlCommand.COM_QUERY); + ctx.setThreadLocalInfo(); + + StmtExecutor executor = new StmtExecutor( + ctx, new InsertStmt(new TableName("db", "tbl"), new QueryStatement(ValuesRelation.newDualRelation()))); + ctx.setExecutor(executor); + + // insert no time out + Assert.assertFalse(ctx.isKilled()); + long now = ctx.getStartTime() + ctx.getSessionVariable().getInsertTimeoutS() * 1000 - 1; + ctx.checkTimeout(now); + Assert.assertFalse(ctx.isKilled()); + + // Timeout + now = ctx.getStartTime() + ctx.getSessionVariable().getInsertTimeoutS() * 1000 + 1; + ctx.checkTimeout(now); + Assert.assertFalse(ctx.isKilled()); + + // Kill + ctx.kill(true, "insert timeout"); + Assert.assertTrue(ctx.isKilled()); + + // clean up + ctx.cleanup(); + } + @Test public void testThreadLocal() { ConnectContext ctx = new ConnectContext(socketChannel); From b2d1326ee89976926c95a876922f10acb8cac66c Mon Sep 17 00:00:00 2001 From: wyb Date: Tue, 12 Nov 2024 11:50:53 +0800 Subject: [PATCH 8/8] Fix error code Signed-off-by: wyb --- .../src/main/java/com/starrocks/qe/DefaultCoordinator.java | 3 ++- fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java index 8cec82c7e92e4..db3f21495fb3d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java @@ -910,7 +910,8 @@ public RowBatch getNext() throws Exception { copyStatus.getErrorMsg().equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) { ec = InternalErrorCode.CANCEL_NODE_NOT_ALIVE_ERR; } else if (copyStatus.isTimeout()) { - ErrorReport.reportTimeoutException(ErrorCode.ERR_QUERY_TIMEOUT, errMsg); + ErrorReport.reportTimeoutException( + ErrorCode.ERR_TIMEOUT, "Query", jobSpec.getQueryOptions().query_timeout, errMsg); } throw new UserException(ec, errMsg); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index aa449786cb351..14f7af572d90f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -2362,7 +2362,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { "or set parallel_fragment_exec_instance_num to a lower value in session variable"); } else { ErrorReport.reportTimeoutException(ErrorCode.ERR_TIMEOUT, getExecType(), timeout, - String.format("please increase the '%s' session variable", SessionVariable.INSERT_TIMEOUT)); + String.format("please increase the '%s' session variable and retry", + SessionVariable.INSERT_TIMEOUT)); } } }