Skip to content

Commit

Permalink
[BugFix] fix an issue that user-defined variables sql unable to handl…
Browse files Browse the repository at this point in the history
…e variable dependencies. (#48483)

Signed-off-by: edwinhzhang <[email protected]>
(cherry picked from commit dc40504)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/qe/SetExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ExpressionAnalyzer.java
#	fe/fe-core/src/main/java/com/starrocks/sql/analyzer/SetStmtAnalyzer.java
#	fe/fe-core/src/main/java/com/starrocks/sql/parser/HintFactory.java
#	fe/fe-core/src/test/java/com/starrocks/qe/SetExecutorTest.java
#	fe/fe-core/src/test/java/com/starrocks/sql/analyzer/AST2StringBuilderTest.java
#	fe/fe-core/src/test/java/com/starrocks/sql/analyzer/AnalyzeSetVariableTest.java
#	fe/fe-core/src/test/java/com/starrocks/sql/plan/SetVarTest.java
#	test/sql/test_hint/R/test_hint
#	test/sql/test_hint/T/test_hint
  • Loading branch information
zhangheihei authored and mergify[bot] committed Jul 26, 2024
1 parent a95c1e0 commit 5020f14
Show file tree
Hide file tree
Showing 14 changed files with 1,727 additions and 8 deletions.
72 changes: 70 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connect context for it.
Expand Down Expand Up @@ -128,7 +129,8 @@ public class ConnectContext {
// all the modified session variables, will forward to leader
protected Map<String, SetVar> modifiedSessionVariables = new HashMap<>();
// user define variable in this session
protected HashMap<String, UserVariable> userVariables;
protected Map<String, UserVariable> userVariables;
protected Map<String, UserVariable> userVariablesCopyInWrite;
// Scheduler this connection belongs to
protected ConnectScheduler connectScheduler;
// Executor
Expand Down Expand Up @@ -214,7 +216,7 @@ public ConnectContext(SocketChannel channel, SSLContext sslContext) {
isKilled = false;
serializer = MysqlSerializer.newInstance();
sessionVariable = VariableMgr.newSessionVariable();
userVariables = new HashMap<>();
userVariables = new ConcurrentHashMap<>();
command = MysqlCommand.COM_SLEEP;
queryDetail = null;
plannerProfile = new PlannerProfile();
Expand Down Expand Up @@ -337,6 +339,56 @@ public void modifyUserVariable(SetVar setVar) {
userVariables.put(setVar.getVariable(), userDefineVariable);
}

/**
* 1. The {@link ConnectContext#userVariables} in the current session should not be modified
* until you are sure that the set sql was executed successfully.
* 2. Changes to user variables during set sql execution should
* be effected in the {@link ConnectContext#userVariablesCopyInWrite}.
* */
public void modifyUserVariableCopyInWrite(UserVariable userVariable) {
if (userVariablesCopyInWrite != null) {
if (userVariablesCopyInWrite.size() > 1024) {
throw new SemanticException("User variable exceeds the maximum limit of 1024");
}
userVariablesCopyInWrite.put(userVariable.getVariable(), userVariable);
}
}

/**
* The SQL execution that sets the variable must reset userVariablesCopyInWrite when it finishes,
* either normally or abnormally.
*
* This method needs to be called at the time of setting the user variable.
* call by {@link SetExecutor#execute()}, {@link StmtExecutor#processQueryScopeHint()}
* */
public void resetUserVariableCopyInWrite() {
userVariablesCopyInWrite = null;
}

/**
* After the successful execution of the SQL that set the variable,
* the result of the change to the copy of userVariables is set back to the current session.
*
* call by {@link SetExecutor#execute()}, {@link StmtExecutor#processQueryScopeHint()}
* */
public void modifyUserVariables(Map<String, UserVariable> userVarCopyInWrite) {
if (userVarCopyInWrite.size() > 1024) {
throw new SemanticException("User variable exceeds the maximum limit of 1024");
}
this.userVariables = userVarCopyInWrite;
}

/**
* Instead of using {@link ConnectContext#userVariables} when set userVariables,
* use a copy of it, the purpose of which is to ensure atomicity/isolation of modifications to userVariables
*
* This method needs to be called at the time of setting the user variable.
* call by {@link SetExecutor#execute()}, {@link StmtExecutor#processQueryScopeHint()}
* */
public void modifyUserVariablesCopyInWrite(Map<String, UserVariable> userVariables) {
this.userVariablesCopyInWrite = userVariables;
}

public SetStmt getModifiedSessionVariables() {
List<SetVar> sessionVariables = new ArrayList<>();
if (!modifiedSessionVariables.isEmpty()) {
Expand Down Expand Up @@ -366,6 +418,22 @@ public void resetSessionVariable() {
modifiedSessionVariables.clear();
}

public UserVariable getUserVariableCopyInWrite(String variable) {
if (userVariablesCopyInWrite == null) {
return null;
}

return userVariablesCopyInWrite.get(variable);
}

public Map<String, UserVariable> getUserVariablesCopyInWrite() {
if (userVariablesCopyInWrite == null) {
return null;
}

return userVariablesCopyInWrite;
}

public void setSessionVariable(SessionVariable sessionVariable) {
this.sessionVariable = sessionVariable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,12 @@ public String toString() {
", idx=" + idx +
'}';
}

public String getOrigStmt() {
return originStmt;
}

public int getIdx() {
return idx;
}
}
53 changes: 53 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SetExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.DdlException;
<<<<<<< HEAD
import com.starrocks.common.Pair;
import com.starrocks.common.Status;
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.SetNamesVar;
=======
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.SetStmtAnalyzer;
import com.starrocks.sql.ast.SetListItem;
>>>>>>> dc40504bac ([BugFix] fix an issue that user-defined variables sql unable to handle variable dependencies. (#48483))
import com.starrocks.sql.ast.SetPassVar;
import com.starrocks.sql.ast.SetStmt;
import com.starrocks.sql.ast.SetTransaction;
Expand All @@ -53,6 +59,9 @@
import java.nio.charset.StandardCharsets;
import java.util.List;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// Set executor
public class SetExecutor {
private final ConnectContext ctx;
Expand All @@ -63,8 +72,24 @@ public SetExecutor(ConnectContext ctx, SetStmt stmt) {
this.stmt = stmt;
}

<<<<<<< HEAD
private void setVariablesOfAllType(SetVar var) throws DdlException {
if (var instanceof SetPassVar) {
=======
private void setVariablesOfAllType(SetListItem var) throws DdlException {
if (var instanceof SystemVariable) {
ctx.modifySystemVariable((SystemVariable) var, false);
} else if (var instanceof UserVariable) {
UserVariable userVariable = (UserVariable) var;
SetStmtAnalyzer.calcuteUserVariable(userVariable);

if (userVariable.getEvaluatedExpression() == null) {
userVariable.deriveUserVariableExpressionResult(ctx);
}

ctx.modifyUserVariableCopyInWrite(userVariable);
} else if (var instanceof SetPassVar) {
>>>>>>> dc40504bac ([BugFix] fix an issue that user-defined variables sql unable to handle variable dependencies. (#48483))
// Set password
SetPassVar setPassVar = (SetPassVar) var;
ctx.getGlobalStateMgr().getAuth().setPassword(setPassVar);
Expand Down Expand Up @@ -100,8 +125,36 @@ private boolean isSessionVar(SetVar var) {
* @throws DdlException
*/
public void execute() throws DdlException {
<<<<<<< HEAD
for (SetVar var : stmt.getSetVars()) {
setVariablesOfAllType(var);
=======
Map<String, UserVariable> clonedUserVars = new ConcurrentHashMap<>();
boolean hasUserVar = stmt.getSetListItems().stream().anyMatch(var -> var instanceof UserVariable);
boolean executeSuccess = true;
if (hasUserVar) {
clonedUserVars.putAll(ctx.getUserVariables());
ctx.modifyUserVariablesCopyInWrite(clonedUserVars);
}
try {
for (SetListItem var : stmt.getSetListItems()) {
setVariablesOfAllType(var);
}
} catch (Throwable e) {
if (hasUserVar) {
executeSuccess = false;
}
throw e;
} finally {
//If the set sql contains more than one user variable,
//the atomicity of the modification of this set of variables must be ensured.
if (hasUserVar) {
ctx.resetUserVariableCopyInWrite();
if (executeSuccess) {
ctx.modifyUserVariables(clonedUserVars);
}
}
>>>>>>> dc40504bac ([BugFix] fix an issue that user-defined variables sql unable to handle variable dependencies. (#48483))
}
}

Expand Down
107 changes: 107 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -638,6 +639,112 @@ public void execute() throws Exception {
}
}

<<<<<<< HEAD
=======
private void clearQueryScopeHintContext() {
Iterator<Map.Entry<String, UserVariable>> iterator = context.userVariables.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, UserVariable> entry = iterator.next();
if (entry.getValue().isFromHint()) {
iterator.remove();
}
}
}

// support select hint e.g. select /*+ SET_VAR(query_timeout=1) */ sleep(3);
@VisibleForTesting
public void processQueryScopeHint() throws DdlException {
SessionVariable clonedSessionVariable = null;
UUID queryId = context.getQueryId();
Map<String, UserVariable> clonedUserVars = new ConcurrentHashMap<>();
clonedUserVars.putAll(context.getUserVariables());
boolean hasUserVariableHint = parsedStmt.getAllQueryScopeHints()
.stream().anyMatch(hint -> hint instanceof UserVariableHint);
if (hasUserVariableHint) {
context.modifyUserVariablesCopyInWrite(clonedUserVars);
}
boolean executeSuccess = true;
try {
for (HintNode hint : parsedStmt.getAllQueryScopeHints()) {
if (hint instanceof SetVarHint) {
if (clonedSessionVariable == null) {
clonedSessionVariable = (SessionVariable) context.sessionVariable.clone();
}
for (Map.Entry<String, String> entry : hint.getValue().entrySet()) {
VariableMgr.setSystemVariable(clonedSessionVariable,
new SystemVariable(entry.getKey(), new StringLiteral(entry.getValue())), true);
}
}

if (hint instanceof UserVariableHint) {
UserVariableHint userVariableHint = (UserVariableHint) hint;
for (Map.Entry<String, UserVariable> entry : userVariableHint.getUserVariables().entrySet()) {
if (context.userVariables.containsKey(entry.getKey())) {
throw new SemanticException(PARSER_ERROR_MSG.invalidUserVariableHint(entry.getKey(),
"the user variable name in the hint must not match any existing variable names"));
}
SetStmtAnalyzer.analyzeUserVariable(entry.getValue());
SetStmtAnalyzer.calcuteUserVariable(entry.getValue());
if (entry.getValue().getEvaluatedExpression() == null) {
try {
final UUID uuid = UUIDUtil.genUUID();
context.setQueryId(uuid);
context.setExecutionId(UUIDUtil.toTUniqueId(uuid));
entry.getValue().deriveUserVariableExpressionResult(context);
} finally {
context.setQueryId(queryId);
context.resetReturnRows();
context.getState().reset();
}
}
clonedUserVars.put(entry.getKey(), entry.getValue());
}
}
}
} catch (Throwable t) {
executeSuccess = false;
throw t;
} finally {
if (hasUserVariableHint) {
context.resetUserVariableCopyInWrite();
if (executeSuccess) {
context.modifyUserVariables(clonedUserVars);
}
}
}

if (clonedSessionVariable != null) {
context.setSessionVariable(clonedSessionVariable);
}
}

private boolean createTableCreatedByCTAS(CreateTableAsSelectStmt stmt) throws Exception {
try {
if (stmt instanceof CreateTemporaryTableAsSelectStmt) {
CreateTemporaryTableStmt createTemporaryTableStmt = (CreateTemporaryTableStmt) stmt.getCreateTableStmt();
createTemporaryTableStmt.setSessionId(context.getSessionId());
return context.getGlobalStateMgr().getMetadataMgr().createTemporaryTable(createTemporaryTableStmt);
} else {
return context.getGlobalStateMgr().getMetadataMgr().createTable(stmt.getCreateTableStmt());
}
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
}

private void dropTableCreatedByCTAS(CreateTableAsSelectStmt stmt) throws Exception {
if (stmt instanceof CreateTemporaryTableAsSelectStmt) {
DropTemporaryTableStmt dropTemporaryTableStmt =
new DropTemporaryTableStmt(true, stmt.getCreateTableStmt().getDbTbl(), true);
dropTemporaryTableStmt.setSessionId(context.getSessionId());
DDLStmtExecutor.execute(dropTemporaryTableStmt, context);
} else {
DDLStmtExecutor.execute(new DropTableStmt(
true, stmt.getCreateTableStmt().getDbTbl(), true), context);
}
}

>>>>>>> dc40504bac ([BugFix] fix an issue that user-defined variables sql unable to handle variable dependencies. (#48483))
private void handleCreateTableAsSelectStmt(long beginTimeInNanoSecond) throws Exception {
CreateTableAsSelectStmt createTableAsSelectStmt = (CreateTableAsSelectStmt) parsedStmt;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,7 @@ public Void visitVariableExpr(VariableExpr node, Scope context) {
return null;
}

<<<<<<< HEAD
Type variableType = userVariable.getResolvedExpression().getType();
node.setType(variableType);

Expand All @@ -1090,6 +1091,21 @@ public Void visitVariableExpr(VariableExpr node, Scope context) {
}
} catch (AnalysisException | DdlException e) {
throw new SemanticException(e.getMessage());
=======
@Override
public Void visitUserVariableExpr(UserVariableExpr node, Scope context) {
UserVariable userVariable;
if (session.getUserVariablesCopyInWrite() == null) {
userVariable = session.getUserVariable(node.getName());
} else {
userVariable = session.getUserVariableCopyInWrite(node.getName());
}

if (userVariable == null) {
node.setValue(NullLiteral.create(Type.STRING));
} else {
node.setValue(userVariable.getEvaluatedExpression());
>>>>>>> dc40504bac ([BugFix] fix an issue that user-defined variables sql unable to handle variable dependencies. (#48483))
}
return null;
}
Expand Down
Loading

0 comments on commit 5020f14

Please sign in to comment.