Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support insert match column by name #51181

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public enum ErrorCode {
"No partitions have data available for loading. If you are sure there may be no data to be loaded, " +
"you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " +
"to ensure such load jobs can succeed"),
ERR_INSERTED_COLUMN_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 42601 more suitable?

"Inserted target column count: %d doesn't match select/value column count: %d"),
ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"),
ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'},
Expand All @@ -329,6 +329,7 @@ public enum ErrorCode {
ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'},
"Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " +
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

42601?


/**
* 5700 - 5799: Partition
Expand Down
79 changes: 49 additions & 30 deletions fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.alter.SchemaChangeHandler;
import com.starrocks.analysis.DescriptorTable;
Expand Down Expand Up @@ -592,8 +593,31 @@ private void castLiteralToTargetColumnsType(InsertStmt insertStatement) {

private OptExprBuilder fillDefaultValue(LogicalPlan logicalPlan, ColumnRefFactory columnRefFactory,
InsertStmt insertStatement, List<ColumnRefOperator> outputColumns) {
Map<ColumnRefOperator, ScalarOperator> columnRefMap = new HashMap<>();
// targetColumnNames is for check whether schema column is in target column list or not
Set<String> targetColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
targetColumnNames.addAll(
insertStatement.getTargetColumnNames() != null ? insertStatement.getTargetColumnNames() :
outputBaseSchema.stream().map(Column::getName).collect(Collectors.toList()));

// sourceColumnMappedNames is the mapped name of source columns corresponding to the target columns.
// 1. if match by position, source mapped column name can be converted from target column name one by one.
// 2. if match by name, source mapped column name is same as the source column name.
Map<String, Integer> mappedColumnToSourceIdx = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
List<String> sourceColumnMappedNames = null;
if (insertStatement.isColumnMatchByPosition()) {
sourceColumnMappedNames = insertStatement.getTargetColumnNames() != null ? insertStatement.getTargetColumnNames() :
outputBaseSchema.stream().map(Column::getName).collect(Collectors.toList());
} else {
Preconditions.checkState(insertStatement.isColumnMatchByName());
sourceColumnMappedNames = insertStatement.getQueryStatement().getQueryRelation().getColumnOutputNames();
}
Preconditions.checkState(sourceColumnMappedNames != null);
for (int columnIdx = 0; columnIdx < sourceColumnMappedNames.size(); ++columnIdx) {
mappedColumnToSourceIdx.put(sourceColumnMappedNames.get(columnIdx), columnIdx);
}

// generate columnRefMap (fill default value)
Map<ColumnRefOperator, ScalarOperator> columnRefMap = new HashMap<>();
for (int columnIdx = 0; columnIdx < outputBaseSchema.size(); ++columnIdx) {
if (needToSkip(insertStatement, columnIdx)) {
continue;
Expand All @@ -603,40 +627,35 @@ private OptExprBuilder fillDefaultValue(LogicalPlan logicalPlan, ColumnRefFactor
if (targetColumn.isGeneratedColumn()) {
continue;
}
if (insertStatement.getTargetColumnNames() == null) {
outputColumns.add(logicalPlan.getOutputColumn().get(columnIdx));
columnRefMap.put(logicalPlan.getOutputColumn().get(columnIdx),
logicalPlan.getOutputColumn().get(columnIdx));

String targetColumnName = targetColumn.getName();
if (mappedColumnToSourceIdx.containsKey(targetColumnName) && targetColumnNames.contains(targetColumnName)) {
ColumnRefOperator col = logicalPlan.getOutputColumn().get(mappedColumnToSourceIdx.get(targetColumnName));
outputColumns.add(col);
columnRefMap.put(col, col);
} else {
int idx = insertStatement.getTargetColumnNames().indexOf(targetColumn.getName().toLowerCase());
if (idx == -1) {
ScalarOperator scalarOperator;
Column.DefaultValueType defaultValueType = targetColumn.getDefaultValueType();
if (defaultValueType == Column.DefaultValueType.NULL || targetColumn.isAutoIncrement()) {
scalarOperator = ConstantOperator.createNull(targetColumn.getType());
} else if (defaultValueType == Column.DefaultValueType.CONST) {
scalarOperator = ConstantOperator.createVarchar(targetColumn.calculatedDefaultValue());
} else if (defaultValueType == Column.DefaultValueType.VARY) {
if (SUPPORTED_DEFAULT_FNS.contains(targetColumn.getDefaultExpr().getExpr())) {
scalarOperator = SqlToScalarOperatorTranslator.
translate(targetColumn.getDefaultExpr().obtainExpr());
} else {
throw new SemanticException(
"Column:" + targetColumn.getName() + " has unsupported default value:"
+ targetColumn.getDefaultExpr().getExpr());
}
ScalarOperator scalarOperator;
Column.DefaultValueType defaultValueType = targetColumn.getDefaultValueType();
if (defaultValueType == Column.DefaultValueType.NULL || targetColumn.isAutoIncrement()) {
scalarOperator = ConstantOperator.createNull(targetColumn.getType());
} else if (defaultValueType == Column.DefaultValueType.CONST) {
scalarOperator = ConstantOperator.createVarchar(targetColumn.calculatedDefaultValue());
} else if (defaultValueType == Column.DefaultValueType.VARY) {
if (SUPPORTED_DEFAULT_FNS.contains(targetColumn.getDefaultExpr().getExpr())) {
scalarOperator = SqlToScalarOperatorTranslator.
translate(targetColumn.getDefaultExpr().obtainExpr());
} else {
throw new SemanticException("Unknown default value type:%s", defaultValueType.toString());
throw new SemanticException("Column:" + targetColumnName + " has unsupported default value:"
+ targetColumn.getDefaultExpr().getExpr());
}
ColumnRefOperator col = columnRefFactory
.create(scalarOperator, scalarOperator.getType(), scalarOperator.isNullable());

outputColumns.add(col);
columnRefMap.put(col, scalarOperator);
} else {
outputColumns.add(logicalPlan.getOutputColumn().get(idx));
columnRefMap.put(logicalPlan.getOutputColumn().get(idx), logicalPlan.getOutputColumn().get(idx));
throw new SemanticException("Unknown default value type:%s", defaultValueType.toString());
}
ColumnRefOperator col = columnRefFactory
.create(scalarOperator, scalarOperator.getType(), scalarOperator.isNullable());

outputColumns.add(col);
columnRefMap.put(col, scalarOperator);
}
}
return logicalPlan.getRootBuilder().withNewRoot(new LogicalProjectOperator(new HashMap<>(columnRefMap)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.sql.ast.DefaultValueExpr;
import com.starrocks.sql.ast.FileTableFunctionRelation;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.InsertStmt.ColumnMatchPolicy;
import com.starrocks.sql.ast.LoadStmt;
import com.starrocks.sql.ast.PartitionNames;
import com.starrocks.sql.ast.QueryRelation;
Expand Down Expand Up @@ -213,13 +214,11 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
if (insertStmt.getTargetColumnNames() == null) {
if (table instanceof OlapTable) {
targetColumns = new ArrayList<>(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn());
mentionedColumns =
((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream()
.map(Column::getName).collect(Collectors.toSet());
mentionedColumns.addAll(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream().map(Column::getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me the reason to use addAll()?

Copy link
Contributor Author

@wyb wyb Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentionedColumns is CASE_INSENSITIVE_ORDER set.
Collectors.toSet() returns hash set.

.collect(Collectors.toSet()));
} else {
targetColumns = new ArrayList<>(table.getBaseSchema());
mentionedColumns =
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
mentionedColumns.addAll(table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()));
}
} else {
targetColumns = new ArrayList<>();
Expand All @@ -235,7 +234,7 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
throw new SemanticException("generated column '%s' can not be specified", colName);
}
if (!mentionedColumns.add(colName)) {
throw new SemanticException("Column '%s' specified twice", colName);
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName);
}
requiredKeyColumns.remove(colName.toLowerCase());
targetColumns.add(column);
Expand Down Expand Up @@ -274,13 +273,40 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
if ((table.isIcebergTable() || table.isHiveTable()) && insertStmt.isStaticKeyPartitionInsert()) {
// full column size = mentioned column size + partition column size for static partition insert
mentionedColumnSize -= table.getPartitionColumnNames().size();
mentionedColumns.removeAll(table.getPartitionColumnNames());
}

// check target and source columns match
QueryRelation query = insertStmt.getQueryStatement().getQueryRelation();
if (query.getRelationFields().size() != mentionedColumnSize) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERTED_COLUMN_MISMATCH, mentionedColumnSize,
query.getRelationFields().size());
if (insertStmt.isColumnMatchByPosition()) {
if (query.getRelationFields().size() != mentionedColumnSize) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERT_COLUMN_COUNT_MISMATCH, mentionedColumnSize,
query.getRelationFields().size());
}
} else {
Preconditions.checkState(insertStmt.isColumnMatchByName());
if (query instanceof ValuesRelation) {
throw new SemanticException("Insert match column by name does not support values()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ErrorReport.reportSemanticException(42601, ...)

}

Set<String> selectColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (String colName : insertStmt.getQueryStatement().getQueryRelation().getColumnOutputNames()) {
if (!selectColumnNames.add(colName)) {
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName);
}
}
if (!selectColumnNames.containsAll(mentionedColumns)) {
mentionedColumns.removeAll(selectColumnNames);
ErrorReport.reportSemanticException(
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Target", String.join(", ", mentionedColumns), "source");
}
if (!mentionedColumns.containsAll(selectColumnNames)) {
selectColumnNames.removeAll(mentionedColumns);
ErrorReport.reportSemanticException(
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Source", String.join(", ", selectColumnNames), "target");
}
}

// check default value expr
if (query instanceof ValuesRelation) {
ValuesRelation valuesRelation = (ValuesRelation) query;
Expand Down Expand Up @@ -308,15 +334,29 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext

private static void analyzeProperties(InsertStmt insertStmt, ConnectContext session) {
Map<String, String> properties = insertStmt.getProperties();

// column match by related properties
// parse the property and remove it for 'LoadStmt.checkProperties' validation
if (properties.containsKey(InsertStmt.PROPERTY_MATCH_COLUMN_BY)) {
String property = properties.remove(InsertStmt.PROPERTY_MATCH_COLUMN_BY);
ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.fromString(property);
if (columnMatchPolicy == null) {
String msg = String.format("%s (case insensitive)", String.join(", ", ColumnMatchPolicy.getCandidates()));
ErrorReport.reportSemanticException(
ErrorCode.ERR_INVALID_VALUE, InsertStmt.PROPERTY_MATCH_COLUMN_BY, property, msg);
}
insertStmt.setColumnMatchPolicy(columnMatchPolicy);
}

// check common properties
// use session variable if not set max_filter_ratio property
if (!properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio()));
}
// use session variable if not set strict_mode property
if (!properties.containsKey(LoadStmt.STRICT_MODE) &&
session.getSessionVariable().getEnableInsertStrict()) {
properties.put(LoadStmt.STRICT_MODE, "true");
if (!properties.containsKey(LoadStmt.STRICT_MODE)) {
properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict()));
}

try {
Expand Down
35 changes: 35 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.sql.parser.NodePosition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -53,6 +54,7 @@
*/
public class InsertStmt extends DmlStmt {
public static final String STREAMING = "STREAMING";
public static final String PROPERTY_MATCH_COLUMN_BY = "match_column_by";

private final TableName tblName;
private PartitionNames targetPartitionNames;
Expand Down Expand Up @@ -96,6 +98,9 @@ public class InsertStmt extends DmlStmt {

private boolean isVersionOverwrite = false;

// column match by position or name
private ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.POSITION;

public InsertStmt(TableName tblName, PartitionNames targetPartitionNames, String label, List<String> cols,
QueryStatement queryStatement, boolean isOverwrite, Map<String, String> insertProperties,
NodePosition pos) {
Expand Down Expand Up @@ -328,4 +333,34 @@ public Table makeTableFunctionTable(SessionVariable sessionVariable) {
List<Column> columns = collectSelectedFieldsFromQueryStatement();
return new TableFunctionTable(columns, getTableFunctionProperties(), sessionVariable);
}

public enum ColumnMatchPolicy {
POSITION,
NAME;

public static ColumnMatchPolicy fromString(String value) {
for (ColumnMatchPolicy policy : values()) {
if (policy.name().equalsIgnoreCase(value)) {
return policy;
}
}
return null;
}

public static List<String> getCandidates() {
return Arrays.stream(values()).map(p -> p.name().toLowerCase()).collect(Collectors.toList());
}
}

public boolean isColumnMatchByPosition() {
return columnMatchPolicy == ColumnMatchPolicy.POSITION;
}

public boolean isColumnMatchByName() {
return columnMatchPolicy == ColumnMatchPolicy.NAME;
}

public void setColumnMatchPolicy(ColumnMatchPolicy columnMatchPolicy) {
this.columnMatchPolicy = columnMatchPolicy;
}
}
Loading
Loading