Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support insert timeout #50644

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
if (!entry.getKey().startsWith(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX)) {
throw new SemanticException("Modify failed because unknown properties: " + properties +
", please add `session.` prefix if you want add session variables for mv(" +
"eg, \"session.query_timeout\"=\"30000000\").");
"eg, \"session.insert_timeout\"=\"30000000\").");
}
String varKey = entry.getKey().substring(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length());
SystemVariable variable = new SystemVariable(varKey, new StringLiteral(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ protected void executeSql(String sql) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
sessionVariable.setUsePageCache(false);
sessionVariable.setEnableMaterializedViewRewrite(false);
sessionVariable.setQueryTimeoutS((int) timeoutMs / 2000);
sessionVariable.setInsertTimeoutS((int) timeoutMs / 2000);

context.setExecutor(executor);
context.setQueryId(UUIDUtil.genUUID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
rewriteTask.setTempPartitionName(tmpPartitionName);
rewriteTask.setLastVersion(partitionLastVersion.get(i));
// use half of the alter timeout as rewrite task timeout
rewriteTask.getProperties().put(SessionVariable.QUERY_TIMEOUT, String.valueOf(timeoutMs / 2000));
rewriteTask.getProperties().put(SessionVariable.INSERT_TIMEOUT, String.valueOf(timeoutMs / 2000));
rewriteTasks.add(rewriteTask);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ private void execute(ExecPlan execPlan, ConnectContext context) throws Exception
fragments, scanNodes);

QeProcessorImpl.INSTANCE.registerQuery(queryId, coord);
int leftTimeSecond = context.getSessionVariable().getQueryTimeoutS();
int leftTimeSecond = context.getExecTimeout();
coord.setTimeoutSecond(leftTimeSecond);
coord.exec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ public enum ErrorCode {
ERR_INVALID_VALUE(5018, new byte[] {'H', 'Y', '0', '0', '0'}, "Invalid %s: '%s'. Expected values should be %s"),
ERR_NO_ALTER_OPERATION(5023, new byte[] {'H', 'Y', '0', '0', '0'},
"No operation in alter statement"),
ERR_QUERY_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'},
"Query timeout. %s"),
ERR_TIMEOUT(5024, new byte[] {'H', 'Y', '0', '0', '0'}, "%s reached its timeout of %d seconds, %s"),
ERR_FAILED_WHEN_INSERT(5025, new byte[] {'H', 'Y', '0', '0', '0'}, "Failed when INSERT execute"),
ERR_UNSUPPORTED_TYPE_IN_CTAS(5026, new byte[] {'H', 'Y', '0', '0', '0'},
"Unsupported type '%s' in create table as select statement"),
Expand Down Expand Up @@ -320,7 +319,7 @@ public enum ErrorCode {
"No partitions have data available for loading. If you are sure there may be no data to be loaded, " +
"you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " +
"to ensure such load jobs can succeed"),
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'4', '2', '6', '0', '1'},
"Inserted target column count: %d doesn't match select/value column count: %d"),
ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"),
ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'},
Expand All @@ -329,7 +328,7 @@ public enum ErrorCode {
ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'},
"Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " +
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'4', '2', '6', '0', '1'}, "%s column: %s has no matching %s column"),

/**
* 5700 - 5799: Partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ private static SystemVariable getMVSystemVariable(Map<String, String> properties
throw new AnalysisException("Analyze materialized properties failed " +
"because unknown properties: " + properties +
", please add `session.` prefix if you want add session variables for mv(" +
"eg, \"session.query_timeout\"=\"30000000\").");
"eg, \"session.insert_timeout\"=\"30000000\").");
}
String varKey = entry.getKey().substring(
PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
DataCacheSelectMetrics metrics = null;
Coordinator coordinator = stmtExecutor.getCoordinator();
Preconditions.checkNotNull(coordinator, "Coordinator can't be null");
coordinator.join(connectContext.getSessionVariable().getQueryTimeoutS());
coordinator.join(stmtExecutor.getExecTimeout());
if (coordinator.isDone()) {
metrics = stmtExecutor.getCoordinator().getDataCacheSelectMetrics();
}
Expand Down
17 changes: 6 additions & 11 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.TableName;
Expand Down Expand Up @@ -81,10 +80,7 @@ public class Pipe implements GsonPostProcessable {
public static final long DEFAULT_BATCH_FILES = 256;
public static final int FAILED_TASK_THRESHOLD = 5;

private static final ImmutableMap<String, String> DEFAULT_TASK_EXECUTION_VARIABLES =
ImmutableMap.<String, String>builder()
.put("query_timeout", "3600")
.build();
private static final String TASK_PROPERTY_PREFIX = "task.";

@SerializedName(value = "name")
private final String name;
Expand Down Expand Up @@ -128,7 +124,6 @@ protected Pipe(PipeId id, String name, TableName targetTable, FilePipeSource sou
this.createdTime = TimeUtils.getEpochSeconds();
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables.putAll(DEFAULT_TASK_EXECUTION_VARIABLES);
}

public static Pipe fromStatement(long id, CreatePipeStmt stmt) {
Expand All @@ -145,9 +140,9 @@ public static Pipe fromStatement(long id, CreatePipeStmt stmt) {

public void processProperties(Map<String, String> properties) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String key = entry.getKey().toLowerCase();
String value = entry.getValue();
switch (key.toLowerCase()) {
switch (key) {
case PipeAnalyzer.PROPERTY_POLL_INTERVAL: {
this.pollIntervalSecond = Integer.parseInt(value);
break;
Expand All @@ -171,11 +166,11 @@ public void processProperties(Map<String, String> properties) {
}
default: {
// task execution variables
if (key.toUpperCase().startsWith("TASK.")) {
String taskVariable = StringUtils.removeStart(key.toUpperCase(), "TASK.");
if (key.startsWith(TASK_PROPERTY_PREFIX)) {
String taskVariable = StringUtils.removeStart(key, TASK_PROPERTY_PREFIX);
this.taskExecutionVariables.put(taskVariable, value);
} else {
throw new IllegalArgumentException("unsupported property: " + key);
throw new IllegalArgumentException("unsupported property: " + entry.getKey());
}
}
}
Expand Down
33 changes: 27 additions & 6 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,18 @@ public void kill(boolean killConnection, String cancelledMessage) {
}
}

public int getExecTimeout() {
return executor != null ? executor.getExecTimeout() : sessionVariable.getQueryTimeoutS();
}

private String getExecType() {
return executor != null ? executor.getExecType() : "Query";
}

private boolean isExecLoadType() {
return executor != null && executor.isExecLoadType();
}

public void checkTimeout(long now) {
long startTimeMillis = getStartTime();
if (startTimeMillis <= 0) {
Expand All @@ -960,27 +972,36 @@ public void checkTimeout(long now) {
if (executor != null) {
sql = executor.getOriginStmtInString();
}
String errMsg = "";
if (command == MysqlCommand.COM_SLEEP) {
if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
int waitTimeout = sessionVariable.getWaitTimeoutS();
if (delta > waitTimeout * 1000L) {
// Need kill this connection.
LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}, query id: {}, sql: {}",
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS(), queryId, sql);
getMysqlChannel().getRemoteHostPortString(), waitTimeout, queryId, sql);

killFlag = true;
killConnection = true;

errMsg = String.format("Connection reached its wait timeout of %d seconds", waitTimeout);
}
} else {
long timeoutSecond = sessionVariable.getQueryTimeoutS();
long timeoutSecond = getExecTimeout();
if (delta > timeoutSecond * 1000L) {
LOG.warn("kill query timeout, remote: {}, query timeout: {}, query id: {}, sql: {}",
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS(), queryId, sql);
LOG.warn("kill timeout {}, remote: {}, execute timeout: {}, query id: {}, sql: {}",
getExecType().toLowerCase(), getMysqlChannel().getRemoteHostPortString(), timeoutSecond,
queryId, sql);

// Only kill
killFlag = true;

String suggestedMsg = String.format("please increase the '%s' session variable",
isExecLoadType() ? SessionVariable.INSERT_TIMEOUT : SessionVariable.QUERY_TIMEOUT);
errMsg = ErrorCode.ERR_TIMEOUT.formatErrorMsg(getExecType(), timeoutSecond, suggestedMsg);
}
}
if (killFlag) {
kill(killConnection, "query timeout");
kill(killConnection, errMsg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ public LeaderOpExecutor(Pair<String, Integer> ipAndPort, StatementBase parsedStm
this.originStmt = originStmt;
this.ctx = ctx;
if (status.isNeedToWaitJournalSync()) {
this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.waitTimeoutMs = ctx.getExecTimeout() * 1000;
} else {
this.waitTimeoutMs = 0;
}
// set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms
// set thriftTimeoutMs to exec timeout + thrift_rpc_timeout_ms
// so that we can return an execution timeout instead of a network timeout
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms;
this.thriftTimeoutMs = ctx.getExecTimeout() * 1000 + Config.thrift_rpc_timeout_ms;
if (this.thriftTimeoutMs < 0) {
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.thriftTimeoutMs = ctx.getExecTimeout() * 1000;
}
this.parsedStmt = parsedStmt;
}
Expand Down
21 changes: 11 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/qe/ResultReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

package com.starrocks.qe;

import com.starrocks.common.ErrorCode;
import com.starrocks.common.Status;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.metric.MetricRepo;
Expand Down Expand Up @@ -62,19 +63,20 @@ public class ResultReceiver {
private volatile boolean isDone = false;
private volatile boolean isCancel = false;
private long packetIdx = 0;
private final long timeoutTs;
private final int timeoutMs;
private final long deadlineMs;
private final TNetworkAddress address;
private final PUniqueId finstId;
private final Long backendId;
private Thread currentThread;

public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) {
this.finstId = new PUniqueId();
this.finstId.hi = tid.hi;
this.finstId.lo = tid.lo;
this.backendId = backendId;
this.address = address;
this.timeoutTs = System.currentTimeMillis() + timeoutMs;
this.timeoutMs = timeoutMs;
this.deadlineMs = System.currentTimeMillis() + timeoutMs;
}

public RowBatch getNext(Status status) throws TException {
Expand All @@ -90,11 +92,11 @@ public RowBatch getNext(Status status) throws TException {
PFetchDataResult pResult = null;
while (pResult == null) {
long currentTs = System.currentTimeMillis();
if (currentTs >= timeoutTs) {
if (currentTs >= deadlineMs) {
throw new TimeoutException("query timeout");
}
try {
pResult = future.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
pResult = future.get(deadlineMs - currentTs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue to get result
LOG.info("future get interrupted Exception");
Expand Down Expand Up @@ -139,17 +141,16 @@ public RowBatch getNext(Status status) throws TException {
LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e);
if (e.getMessage().contains("time out")) {
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
status.setStatus(new Status(TStatusCode.TIMEOUT,
String.format("Query exceeded time limit of %d seconds",
ConnectContext.get().getSessionVariable().getQueryTimeoutS())));
status.setStatus(new Status(TStatusCode.TIMEOUT, ErrorCode.ERR_TIMEOUT.formatErrorMsg("Query", timeoutMs / 1000,
String.format("please increase the '%s' session variable", SessionVariable.QUERY_TIMEOUT))));
} else {
status.setRpcStatus(e.getMessage());
SimpleScheduler.addToBlocklist(backendId);
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
status.setInternalErrorStatus(String.format("Query exceeded time limit of %d seconds",
ConnectContext.get().getSessionVariable().getQueryTimeoutS()));
status.setInternalErrorStatus(ErrorCode.ERR_TIMEOUT.formatErrorMsg("Query", timeoutMs / 1000,
String.format("please increase the '%s' session variable", SessionVariable.QUERY_TIMEOUT)));
if (MetricRepo.hasInit) {
MetricRepo.COUNTER_QUERY_TIMEOUT.increase(1L);
}
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String MAX_PARALLEL_SCAN_INSTANCE_NUM = "max_parallel_scan_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio";
public static final String INSERT_TIMEOUT = "insert_timeout";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user has set query_timeout, how to maintain compatibility when the user upgrades?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add insert_timeout session variable is to make it easier for users to change.

public static final String DYNAMIC_OVERWRITE = "dynamic_overwrite";
public static final String ENABLE_SPILL = "enable_spill";
public static final String ENABLE_SPILL_TO_REMOTE_STORAGE = "enable_spill_to_remote_storage";
Expand Down Expand Up @@ -1163,6 +1164,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = INSERT_MAX_FILTER_RATIO)
private double insertMaxFilterRatio = 0;

@VariableMgr.VarAttr(name = INSERT_TIMEOUT)
private int insertTimeoutS = 14400;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird if this value is different from default pipe/mv task timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the value is same as broker load default job timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update pipe/mv default timeout same as insert timeout


@VariableMgr.VarAttr(name = ENABLE_SPILL)
private boolean enableSpill = false;

Expand Down Expand Up @@ -2851,6 +2855,14 @@ public void setInsertMaxFilterRatio(double insertMaxFilterRatio) {
this.insertMaxFilterRatio = insertMaxFilterRatio;
}

public int getInsertTimeoutS() {
return insertTimeoutS;
}

public void setInsertTimeoutS(int insertTimeoutS) {
this.insertTimeoutS = insertTimeoutS;
}

public boolean isEnableSpill() {
return enableSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void exec() throws UserException {
if (null == future) {
return;
}
PExecShortCircuitResult shortCircuitResult = future.get(
context.getSessionVariable().getQueryTimeoutS(), TimeUnit.SECONDS);
PExecShortCircuitResult shortCircuitResult = future.get(context.getExecTimeout(), TimeUnit.SECONDS);
watch.stop();
long t = watch.elapsed().toMillis();
MetricRepo.HISTO_SHORTCIRCUIT_RPC_LATENCY.update(t);
Expand Down
Loading
Loading