Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 12, 2024
1 parent b6b56f5 commit 007c152
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public interface Explainable {
Plan getExplainPlan(ConnectContext ctx) throws Exception;

default Optional<NereidsPlanner> getPlanner(LogicalPlan logicalPlan, StatementContext ctx) throws Exception {
default Optional<NereidsPlanner> getExplainPlanner(LogicalPlan logicalPlan, StatementContext ctx) throws Exception {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}
Explainable explainable = (Explainable) logicalPlan;
explainPlan = ((LogicalPlan) explainable.getExplainPlan(ctx));
NereidsPlanner planner = explainable.getPlanner(explainPlan, ctx.getStatementContext()).orElseGet(() ->
NereidsPlanner planner = explainable.getExplainPlanner(explainPlan, ctx.getStatementContext()).orElseGet(() ->
new NereidsPlanner(ctx.getStatementContext())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public Plan getExplainPlan(ConnectContext ctx) {
}

@Override
public Optional<NereidsPlanner> getPlanner(LogicalPlan logicalPlan, StatementContext ctx) {
public Optional<NereidsPlanner> getExplainPlanner(LogicalPlan logicalPlan, StatementContext ctx) {
ConnectContext connectContext = ctx.getConnectContext();
TableIf targetTableIf = InsertUtils.getTargetTable(originLogicalQuery, connectContext);
boolean supportFastInsertIntoValues = supportFastInsertIntoValues(logicalPlan, targetTableIf, connectContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.doris.nereids.rules.rewrite.MergeProjects;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
Expand Down Expand Up @@ -96,9 +95,9 @@ public Rule build() {
List<List<NamedExpression>> castedRows = castedConstantsAndNullables.key();
List<Boolean> nullables = castedConstantsAndNullables.value();
List<NamedExpression> outputs = Lists.newArrayList();
List<Slot> inlineTableOutput = inlineTable.getOutput();
for (int columnId = 0; columnId < inlineTableOutput.size(); columnId++) {
String name = originConstants.get(0).get(columnId).getName();
List<NamedExpression> firstRow = originConstants.get(0);
for (int columnId = 0; columnId < firstRow.size(); columnId++) {
String name = firstRow.get(columnId).getName();
DataType commonDataType = castedRows.get(0).get(columnId).getDataType();
outputs.add(new SlotReference(name, commonDataType, nullables.get(columnId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand Down Expand Up @@ -82,35 +86,43 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS

private static final Logger LOG = LogManager.getLogger(InsertOverwriteTableCommand.class);

private LogicalPlan logicalQuery;
private LogicalPlan originLogicalQuery;
private Optional<LogicalPlan> logicalQuery;
private Optional<String> labelName;
private final Optional<LogicalPlan> cte;
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isRunning = new AtomicBoolean(false);
private Optional<CascadesContext> analyzeContext;

/**
* constructor
*/
public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<LogicalPlan> cte) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.originLogicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.logicalQuery = Optional.empty();
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
this.cte = cte;
this.analyzeContext = Optional.empty();
}

public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}

public boolean isAutoDetectOverwrite() {
public boolean isAutoDetectOverwrite(LogicalPlan logicalQuery) {
return (logicalQuery instanceof UnboundTableSink)
&& ((UnboundTableSink<?>) this.logicalQuery).isAutoDetectPartition();
&& ((UnboundTableSink<?>) logicalQuery).isAutoDetectPartition();
}

public LogicalPlan getLogicalQuery() {
return logicalQuery.orElse(originLogicalQuery);
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
TableIf targetTableIf = InsertUtils.getTargetTable(originLogicalQuery, ctx);
//check allow insert overwrite
if (!allowInsertOverwrite(targetTableIf)) {
String errMsg = "insert into overwrite only support OLAP and HMS/ICEBERG table."
Expand All @@ -122,13 +134,20 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (targetTableIf instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ctx)) {
throw new AnalysisException("Not allowed to perform current operation on async materialized view");
}
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(
logicalQuery, targetTableIf, Optional.empty(), Optional.empty());
this.analyzeContext = Optional.of(
CascadesContext.initContext(ctx.getStatementContext(), originLogicalQuery, PhysicalProperties.ANY)
);
this.logicalQuery = Optional.of((LogicalPlan) InsertUtils.normalizePlan(
originLogicalQuery, targetTableIf, analyzeContext, Optional.empty()));
if (cte.isPresent()) {
this.logicalQuery = (LogicalPlan) logicalQuery.withChildren(cte.get().withChildren(
this.logicalQuery.child(0)));
LogicalPlan logicalQuery = this.logicalQuery.get();
this.logicalQuery = Optional.of(
(LogicalPlan) logicalQuery.withChildren(
cte.get().withChildren(logicalQuery.child(0))
)
);
}

LogicalPlan logicalQuery = this.logicalQuery.get();
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
Expand Down Expand Up @@ -173,7 +192,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
isRunning.set(true);
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
if (isAutoDetectOverwrite(getLogicalQuery())) {
// taskId here is a group id. it contains all replace tasks made and registered in rpc process.
taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by FrontendService. FE will register new temp
Expand Down Expand Up @@ -220,7 +239,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed with task(or group) id " + taskId);
if (isAutoDetectOverwrite()) {
if (isAutoDetectOverwrite(getLogicalQuery())) {
insertOverwriteManager.taskGroupFail(taskId);
} else {
insertOverwriteManager.taskFail(taskId);
Expand Down Expand Up @@ -288,6 +307,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis
// copy sink tot replace by tempPartitions
UnboundLogicalSink<?> copySink;
InsertCommandContext insertCtx;
LogicalPlan logicalQuery = getLogicalQuery();
if (logicalQuery instanceof UnboundTableSink) {
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink(
Expand Down Expand Up @@ -343,6 +363,7 @@ private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, Lis
*/
private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception {
InsertCommandContext insertCtx;
LogicalPlan logicalQuery = getLogicalQuery();
if (logicalQuery instanceof UnboundTableSink) {
// 1. when overwrite auto-detect, allow auto partition or not is controlled by session variable.
// 2. we save and pass overwrite auto detect by insertCtx
Expand All @@ -363,7 +384,20 @@ private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor executor, lon

@Override
public Plan getExplainPlan(ConnectContext ctx) {
return InsertUtils.getPlanForExplain(ctx, Optional.empty(), this.logicalQuery);
return InsertUtils.getPlanForExplain(ctx, Optional.empty(), getLogicalQuery());
}

@Override
public Optional<NereidsPlanner> getExplainPlanner(LogicalPlan logicalPlan, StatementContext ctx) {
LogicalPlan logicalQuery = getLogicalQuery();
if (logicalQuery instanceof UnboundTableSink) {
boolean allowAutoPartition = ctx.getConnectContext().getSessionVariable().isEnableAutoCreateWhenOverwrite();
OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(allowAutoPartition, true);
InsertIntoTableCommand insertIntoTableCommand = new InsertIntoTableCommand(
logicalQuery, labelName, Optional.of(insertCtx), Optional.empty());
return insertIntoTableCommand.getExplainPlanner(logicalPlan, ctx);
}
return Optional.empty();
}

@Override
Expand All @@ -375,4 +409,10 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
public StmtType stmtType() {
return StmtType.INSERT;
}

public boolean supportFastInsertIntoValues(LogicalPlan logicalPlan, TableIf targetTableIf, ConnectContext ctx) {
return logicalPlan instanceof UnboundTableSink && logicalPlan.child(0) instanceof LogicalInlineTable
&& targetTableIf instanceof OlapTable
&& ctx != null && ctx.getSessionVariable().isEnableFastAnalyzeInsertIntoValues();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.InsertStreamTxnExecutor;
Expand All @@ -92,7 +91,6 @@
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -368,16 +366,11 @@ public static Plan normalizePlan(LogicalPlan plan, TableIf table,
);
}

Optional<ExpressionAnalyzer> analyzer = analyzeContext.map(
Optional<ExpressionAnalyzer> analyzer = Optional.empty();
analyzeContext.map(
cascadesContext -> buildExprAnalyzer(plan, cascadesContext)
);

int insertColumnNum = CollectionUtils.isEmpty(unboundLogicalSink.getColNames())
? columns.size()
: unboundInlineTable.getConstantExprsList().get(0).size();
Boolean[] outputSlotNullables = new Boolean[insertColumnNum];
Arrays.fill(outputSlotNullables, false);

for (List<NamedExpression> values : unboundInlineTable.getConstantExprsList()) {
ImmutableList.Builder<NamedExpression> optimizedRowConstructor = ImmutableList.builder();
if (values.isEmpty()) {
Expand All @@ -387,9 +380,7 @@ public static Plan normalizePlan(LogicalPlan plan, TableIf table,
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
NamedExpression defaultExpression = generateDefaultExpression(column);
addColumnValue(
analyzer, optimizedRowConstructor, outputSlotNullables, i, defaultExpression
);
addColumnValue(analyzer, optimizedRowConstructor, defaultExpression);
}
} else {
if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) {
Expand All @@ -416,18 +407,14 @@ public static Plan normalizePlan(LogicalPlan plan, TableIf table,
}
if (values.get(i) instanceof DefaultValueSlot) {
NamedExpression defaultExpression = generateDefaultExpression(sameNameColumn);
addColumnValue(
analyzer, optimizedRowConstructor, outputSlotNullables, i, defaultExpression
);
addColumnValue(analyzer, optimizedRowConstructor, defaultExpression);
} else {
DataType targetType = DataType.fromCatalogType(sameNameColumn.getType());
Expression castValue = castValue(values.get(i), targetType);
castValue = rewriteContext == null
? castValue
: FoldConstantRuleOnFE.evaluate(castValue, rewriteContext);
addColumnValue(analyzer,
optimizedRowConstructor, outputSlotNullables, i, (NamedExpression) castValue
);
addColumnValue(analyzer, optimizedRowConstructor, (NamedExpression) castValue);
}
}
} else {
Expand All @@ -443,29 +430,21 @@ public static Plan normalizePlan(LogicalPlan plan, TableIf table,
}
if (values.get(i) instanceof DefaultValueSlot) {
NamedExpression defaultExpression = generateDefaultExpression(columns.get(i));
addColumnValue(
analyzer, optimizedRowConstructor, outputSlotNullables, i, defaultExpression
);
addColumnValue(analyzer, optimizedRowConstructor, defaultExpression);
} else {
DataType targetType = DataType.fromCatalogType(columns.get(i).getType());
Expression castValue = castValue(values.get(i), targetType);
castValue = rewriteContext == null
? castValue
: FoldConstantRuleOnFE.evaluate(castValue, rewriteContext);
addColumnValue(
analyzer, optimizedRowConstructor,
outputSlotNullables, i, (NamedExpression) castValue
);
addColumnValue(analyzer, optimizedRowConstructor, (NamedExpression) castValue);
}
}
}
}
optimizedRowConstructors.add(optimizedRowConstructor.build());
}

return plan.withChildren(new LogicalInlineTable(optimizedRowConstructors.build(), Optional.of(
Utils.fastToImmutableList(outputSlotNullables)
)));
return plan.withChildren(new UnboundInlineTable(optimizedRowConstructors.build()));
}

/** buildAnalyzer */
Expand Down Expand Up @@ -538,15 +517,14 @@ public Expression visitUnboundStar(UnboundStar unboundStar, ExpressionRewriteCon
private static void addColumnValue(
Optional<ExpressionAnalyzer> analyzer,
ImmutableList.Builder<NamedExpression> optimizedRowConstructor,
Boolean[] nullable, int index, NamedExpression value) {
NamedExpression value) {
if (analyzer.isPresent() && !(value instanceof Alias && value.child(0) instanceof Literal)) {
ExpressionAnalyzer expressionAnalyzer = analyzer.get();
value = (NamedExpression) expressionAnalyzer.analyze(
value, new ExpressionRewriteContext(expressionAnalyzer.getCascadesContext())
);
}
optimizedRowConstructor.add(value);
nullable[index] |= value.nullable();
}

private static Expression castValue(Expression value, DataType targetType) {
Expand Down
Loading

0 comments on commit 007c152

Please sign in to comment.