Skip to content

Commit

Permalink
[Enhancement] Support insert match column by name
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Sep 20, 2024
1 parent 0df6bd2 commit ad739d2
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 43 deletions.
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public enum ErrorCode {
"No partitions have data available for loading. If you are sure there may be no data to be loaded, " +
"you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " +
"to ensure such load jobs can succeed"),
ERR_INSERTED_COLUMN_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
"Inserted target column count: %d doesn't match select/value column count: %d"),
ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"),
ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'},
Expand All @@ -329,6 +329,7 @@ public enum ErrorCode {
ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'},
"Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " +
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"),

/**
* 5700 - 5799: Partition
Expand Down
79 changes: 49 additions & 30 deletions fe/fe-core/src/main/java/com/starrocks/sql/InsertPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.alter.SchemaChangeHandler;
import com.starrocks.analysis.DescriptorTable;
Expand Down Expand Up @@ -592,8 +593,31 @@ private void castLiteralToTargetColumnsType(InsertStmt insertStatement) {

private OptExprBuilder fillDefaultValue(LogicalPlan logicalPlan, ColumnRefFactory columnRefFactory,
InsertStmt insertStatement, List<ColumnRefOperator> outputColumns) {
Map<ColumnRefOperator, ScalarOperator> columnRefMap = new HashMap<>();
// targetColumnNames is for check whether schema column is in target column list or not
Set<String> targetColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
targetColumnNames.addAll(
insertStatement.getTargetColumnNames() != null ? insertStatement.getTargetColumnNames() :
outputBaseSchema.stream().map(Column::getName).collect(Collectors.toList()));

// sourceColumnMappedNames is the mapped name of source columns corresponding to the target columns.
// 1. if match by position, source mapped column name can be converted from target column name one by one.
// 2. if match by name, source mapped column name is same as the source column name.
Map<String, Integer> mappedColumnToSourceIdx = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
List<String> sourceColumnMappedNames = null;
if (insertStatement.isColumnMatchByPosition()) {
sourceColumnMappedNames = insertStatement.getTargetColumnNames() != null ? insertStatement.getTargetColumnNames() :
outputBaseSchema.stream().map(Column::getName).collect(Collectors.toList());
} else {
Preconditions.checkState(insertStatement.isColumnMatchByName());
sourceColumnMappedNames = insertStatement.getQueryStatement().getQueryRelation().getColumnOutputNames();
}
Preconditions.checkState(sourceColumnMappedNames != null);
for (int columnIdx = 0; columnIdx < sourceColumnMappedNames.size(); ++columnIdx) {
mappedColumnToSourceIdx.put(sourceColumnMappedNames.get(columnIdx), columnIdx);
}

// generate columnRefMap (fill default value)
Map<ColumnRefOperator, ScalarOperator> columnRefMap = new HashMap<>();
for (int columnIdx = 0; columnIdx < outputBaseSchema.size(); ++columnIdx) {
if (needToSkip(insertStatement, columnIdx)) {
continue;
Expand All @@ -603,40 +627,35 @@ private OptExprBuilder fillDefaultValue(LogicalPlan logicalPlan, ColumnRefFactor
if (targetColumn.isGeneratedColumn()) {
continue;
}
if (insertStatement.getTargetColumnNames() == null) {
outputColumns.add(logicalPlan.getOutputColumn().get(columnIdx));
columnRefMap.put(logicalPlan.getOutputColumn().get(columnIdx),
logicalPlan.getOutputColumn().get(columnIdx));

String targetColumnName = targetColumn.getName();
if (mappedColumnToSourceIdx.containsKey(targetColumnName) && targetColumnNames.contains(targetColumnName)) {
ColumnRefOperator col = logicalPlan.getOutputColumn().get(mappedColumnToSourceIdx.get(targetColumnName));
outputColumns.add(col);
columnRefMap.put(col, col);
} else {
int idx = insertStatement.getTargetColumnNames().indexOf(targetColumn.getName().toLowerCase());
if (idx == -1) {
ScalarOperator scalarOperator;
Column.DefaultValueType defaultValueType = targetColumn.getDefaultValueType();
if (defaultValueType == Column.DefaultValueType.NULL || targetColumn.isAutoIncrement()) {
scalarOperator = ConstantOperator.createNull(targetColumn.getType());
} else if (defaultValueType == Column.DefaultValueType.CONST) {
scalarOperator = ConstantOperator.createVarchar(targetColumn.calculatedDefaultValue());
} else if (defaultValueType == Column.DefaultValueType.VARY) {
if (SUPPORTED_DEFAULT_FNS.contains(targetColumn.getDefaultExpr().getExpr())) {
scalarOperator = SqlToScalarOperatorTranslator.
translate(targetColumn.getDefaultExpr().obtainExpr());
} else {
throw new SemanticException(
"Column:" + targetColumn.getName() + " has unsupported default value:"
+ targetColumn.getDefaultExpr().getExpr());
}
ScalarOperator scalarOperator;
Column.DefaultValueType defaultValueType = targetColumn.getDefaultValueType();
if (defaultValueType == Column.DefaultValueType.NULL || targetColumn.isAutoIncrement()) {
scalarOperator = ConstantOperator.createNull(targetColumn.getType());
} else if (defaultValueType == Column.DefaultValueType.CONST) {
scalarOperator = ConstantOperator.createVarchar(targetColumn.calculatedDefaultValue());
} else if (defaultValueType == Column.DefaultValueType.VARY) {
if (SUPPORTED_DEFAULT_FNS.contains(targetColumn.getDefaultExpr().getExpr())) {
scalarOperator = SqlToScalarOperatorTranslator.
translate(targetColumn.getDefaultExpr().obtainExpr());
} else {
throw new SemanticException("Unknown default value type:%s", defaultValueType.toString());
throw new SemanticException("Column:" + targetColumnName + " has unsupported default value:"
+ targetColumn.getDefaultExpr().getExpr());
}
ColumnRefOperator col = columnRefFactory
.create(scalarOperator, scalarOperator.getType(), scalarOperator.isNullable());

outputColumns.add(col);
columnRefMap.put(col, scalarOperator);
} else {
outputColumns.add(logicalPlan.getOutputColumn().get(idx));
columnRefMap.put(logicalPlan.getOutputColumn().get(idx), logicalPlan.getOutputColumn().get(idx));
throw new SemanticException("Unknown default value type:%s", defaultValueType.toString());
}
ColumnRefOperator col = columnRefFactory
.create(scalarOperator, scalarOperator.getType(), scalarOperator.isNullable());

outputColumns.add(col);
columnRefMap.put(col, scalarOperator);
}
}
return logicalPlan.getRootBuilder().withNewRoot(new LogicalProjectOperator(new HashMap<>(columnRefMap)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.sql.ast.DefaultValueExpr;
import com.starrocks.sql.ast.FileTableFunctionRelation;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.InsertStmt.ColumnMatchPolicy;
import com.starrocks.sql.ast.LoadStmt;
import com.starrocks.sql.ast.PartitionNames;
import com.starrocks.sql.ast.QueryRelation;
Expand Down Expand Up @@ -213,13 +214,11 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
if (insertStmt.getTargetColumnNames() == null) {
if (table instanceof OlapTable) {
targetColumns = new ArrayList<>(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn());
mentionedColumns =
((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream()
.map(Column::getName).collect(Collectors.toSet());
mentionedColumns.addAll(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream().map(Column::getName)
.collect(Collectors.toSet()));
} else {
targetColumns = new ArrayList<>(table.getBaseSchema());
mentionedColumns =
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
mentionedColumns.addAll(table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()));
}
} else {
targetColumns = new ArrayList<>();
Expand All @@ -235,7 +234,7 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
throw new SemanticException("generated column '%s' can not be specified", colName);
}
if (!mentionedColumns.add(colName)) {
throw new SemanticException("Column '%s' specified twice", colName);
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName);
}
requiredKeyColumns.remove(colName.toLowerCase());
targetColumns.add(column);
Expand Down Expand Up @@ -274,13 +273,40 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext
if ((table.isIcebergTable() || table.isHiveTable()) && insertStmt.isStaticKeyPartitionInsert()) {
// full column size = mentioned column size + partition column size for static partition insert
mentionedColumnSize -= table.getPartitionColumnNames().size();
mentionedColumns.removeAll(table.getPartitionColumnNames());
}

// check target and source columns match
QueryRelation query = insertStmt.getQueryStatement().getQueryRelation();
if (query.getRelationFields().size() != mentionedColumnSize) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERTED_COLUMN_MISMATCH, mentionedColumnSize,
query.getRelationFields().size());
if (insertStmt.isColumnMatchByPosition()) {
if (query.getRelationFields().size() != mentionedColumnSize) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERT_COLUMN_COUNT_MISMATCH, mentionedColumnSize,
query.getRelationFields().size());
}
} else {
Preconditions.checkState(insertStmt.isColumnMatchByName());
if (query instanceof ValuesRelation) {
throw new SemanticException("Insert match column by name does not support values()");
}

Set<String> selectColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (String colName : insertStmt.getQueryStatement().getQueryRelation().getColumnOutputNames()) {
if (!selectColumnNames.add(colName)) {
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName);
}
}
if (!selectColumnNames.containsAll(mentionedColumns)) {
mentionedColumns.removeAll(selectColumnNames);
ErrorReport.reportSemanticException(
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Target", String.join(", ", mentionedColumns), "source");
}
if (!mentionedColumns.containsAll(selectColumnNames)) {
selectColumnNames.removeAll(mentionedColumns);
ErrorReport.reportSemanticException(
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Source", String.join(", ", selectColumnNames), "target");
}
}

// check default value expr
if (query instanceof ValuesRelation) {
ValuesRelation valuesRelation = (ValuesRelation) query;
Expand Down Expand Up @@ -308,15 +334,29 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext

private static void analyzeProperties(InsertStmt insertStmt, ConnectContext session) {
Map<String, String> properties = insertStmt.getProperties();

// column match by related properties
// parse the property and remove it for 'LoadStmt.checkProperties' validation
if (properties.containsKey(InsertStmt.PROPERTY_MATCH_COLUMN_BY)) {
String property = properties.remove(InsertStmt.PROPERTY_MATCH_COLUMN_BY);
ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.fromString(property);
if (columnMatchPolicy == null) {
String msg = String.format("%s (case insensitive)", String.join(", ", ColumnMatchPolicy.getCandidates()));
ErrorReport.reportSemanticException(
ErrorCode.ERR_INVALID_VALUE, InsertStmt.PROPERTY_MATCH_COLUMN_BY, property, msg);
}
insertStmt.setColumnMatchPolicy(columnMatchPolicy);
}

// check common properties
// use session variable if not set max_filter_ratio property
if (!properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio()));
}
// use session variable if not set strict_mode property
if (!properties.containsKey(LoadStmt.STRICT_MODE) &&
session.getSessionVariable().getEnableInsertStrict()) {
properties.put(LoadStmt.STRICT_MODE, "true");
if (!properties.containsKey(LoadStmt.STRICT_MODE)) {
properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict()));
}

try {
Expand Down
35 changes: 35 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/ast/InsertStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.starrocks.sql.parser.NodePosition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -53,6 +54,7 @@
*/
public class InsertStmt extends DmlStmt {
public static final String STREAMING = "STREAMING";
public static final String PROPERTY_MATCH_COLUMN_BY = "match_column_by";

private final TableName tblName;
private PartitionNames targetPartitionNames;
Expand Down Expand Up @@ -96,6 +98,9 @@ public class InsertStmt extends DmlStmt {

private boolean isVersionOverwrite = false;

// column match by position or name
private ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.POSITION;

public InsertStmt(TableName tblName, PartitionNames targetPartitionNames, String label, List<String> cols,
QueryStatement queryStatement, boolean isOverwrite, Map<String, String> insertProperties,
NodePosition pos) {
Expand Down Expand Up @@ -328,4 +333,34 @@ public Table makeTableFunctionTable(SessionVariable sessionVariable) {
List<Column> columns = collectSelectedFieldsFromQueryStatement();
return new TableFunctionTable(columns, getTableFunctionProperties(), sessionVariable);
}

public enum ColumnMatchPolicy {
POSITION,
NAME;

public static ColumnMatchPolicy fromString(String value) {
for (ColumnMatchPolicy policy : values()) {
if (policy.name().equalsIgnoreCase(value)) {
return policy;
}
}
return null;
}

public static List<String> getCandidates() {
return Arrays.stream(values()).map(p -> p.name().toLowerCase()).collect(Collectors.toList());
}
}

public boolean isColumnMatchByPosition() {
return columnMatchPolicy == ColumnMatchPolicy.POSITION;
}

public boolean isColumnMatchByName() {
return columnMatchPolicy == ColumnMatchPolicy.NAME;
}

public void setColumnMatchPolicy(ColumnMatchPolicy columnMatchPolicy) {
this.columnMatchPolicy = columnMatchPolicy;
}
}
Loading

0 comments on commit ad739d2

Please sign in to comment.