-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Support insert match column by name #51181
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,7 +320,7 @@ public enum ErrorCode { | |
"No partitions have data available for loading. If you are sure there may be no data to be loaded, " + | ||
"you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " + | ||
"to ensure such load jobs can succeed"), | ||
ERR_INSERTED_COLUMN_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'}, | ||
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'}, | ||
"Inserted target column count: %d doesn't match select/value column count: %d"), | ||
ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"), | ||
ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'}, | ||
|
@@ -329,6 +329,7 @@ public enum ErrorCode { | |
ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'}, | ||
"Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " + | ||
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"), | ||
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 42601? |
||
|
||
/** | ||
* 5700 - 5799: Partition | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ | |
import com.starrocks.sql.ast.DefaultValueExpr; | ||
import com.starrocks.sql.ast.FileTableFunctionRelation; | ||
import com.starrocks.sql.ast.InsertStmt; | ||
import com.starrocks.sql.ast.InsertStmt.ColumnMatchPolicy; | ||
import com.starrocks.sql.ast.LoadStmt; | ||
import com.starrocks.sql.ast.PartitionNames; | ||
import com.starrocks.sql.ast.QueryRelation; | ||
|
@@ -213,13 +214,11 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext | |
if (insertStmt.getTargetColumnNames() == null) { | ||
if (table instanceof OlapTable) { | ||
targetColumns = new ArrayList<>(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn()); | ||
mentionedColumns = | ||
((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream() | ||
.map(Column::getName).collect(Collectors.toSet()); | ||
mentionedColumns.addAll(((OlapTable) table).getBaseSchemaWithoutGeneratedColumn().stream().map(Column::getName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you tell me the reason to use addAll()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.collect(Collectors.toSet())); | ||
} else { | ||
targetColumns = new ArrayList<>(table.getBaseSchema()); | ||
mentionedColumns = | ||
table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); | ||
mentionedColumns.addAll(table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet())); | ||
} | ||
} else { | ||
targetColumns = new ArrayList<>(); | ||
|
@@ -235,7 +234,7 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext | |
throw new SemanticException("generated column '%s' can not be specified", colName); | ||
} | ||
if (!mentionedColumns.add(colName)) { | ||
throw new SemanticException("Column '%s' specified twice", colName); | ||
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName); | ||
} | ||
requiredKeyColumns.remove(colName.toLowerCase()); | ||
targetColumns.add(column); | ||
|
@@ -274,13 +273,40 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext | |
if ((table.isIcebergTable() || table.isHiveTable()) && insertStmt.isStaticKeyPartitionInsert()) { | ||
// full column size = mentioned column size + partition column size for static partition insert | ||
mentionedColumnSize -= table.getPartitionColumnNames().size(); | ||
mentionedColumns.removeAll(table.getPartitionColumnNames()); | ||
} | ||
|
||
// check target and source columns match | ||
QueryRelation query = insertStmt.getQueryStatement().getQueryRelation(); | ||
if (query.getRelationFields().size() != mentionedColumnSize) { | ||
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERTED_COLUMN_MISMATCH, mentionedColumnSize, | ||
query.getRelationFields().size()); | ||
if (insertStmt.isColumnMatchByPosition()) { | ||
if (query.getRelationFields().size() != mentionedColumnSize) { | ||
ErrorReport.reportSemanticException(ErrorCode.ERR_INSERT_COLUMN_COUNT_MISMATCH, mentionedColumnSize, | ||
query.getRelationFields().size()); | ||
} | ||
} else { | ||
Preconditions.checkState(insertStmt.isColumnMatchByName()); | ||
if (query instanceof ValuesRelation) { | ||
throw new SemanticException("Insert match column by name does not support values()"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use ErrorReport.reportSemanticException(42601, ...) |
||
} | ||
|
||
Set<String> selectColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); | ||
for (String colName : insertStmt.getQueryStatement().getQueryRelation().getColumnOutputNames()) { | ||
if (!selectColumnNames.add(colName)) { | ||
ErrorReport.reportSemanticException(ErrorCode.ERR_DUP_FIELDNAME, colName); | ||
} | ||
} | ||
if (!selectColumnNames.containsAll(mentionedColumns)) { | ||
mentionedColumns.removeAll(selectColumnNames); | ||
ErrorReport.reportSemanticException( | ||
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Target", String.join(", ", mentionedColumns), "source"); | ||
} | ||
if (!mentionedColumns.containsAll(selectColumnNames)) { | ||
selectColumnNames.removeAll(mentionedColumns); | ||
ErrorReport.reportSemanticException( | ||
ErrorCode.ERR_INSERT_COLUMN_NAME_MISMATCH, "Source", String.join(", ", selectColumnNames), "target"); | ||
} | ||
} | ||
|
||
// check default value expr | ||
if (query instanceof ValuesRelation) { | ||
ValuesRelation valuesRelation = (ValuesRelation) query; | ||
|
@@ -308,15 +334,29 @@ public static void analyzeWithDeferredLock(InsertStmt insertStmt, ConnectContext | |
|
||
private static void analyzeProperties(InsertStmt insertStmt, ConnectContext session) { | ||
Map<String, String> properties = insertStmt.getProperties(); | ||
|
||
// column match by related properties | ||
// parse the property and remove it for 'LoadStmt.checkProperties' validation | ||
if (properties.containsKey(InsertStmt.PROPERTY_MATCH_COLUMN_BY)) { | ||
String property = properties.remove(InsertStmt.PROPERTY_MATCH_COLUMN_BY); | ||
ColumnMatchPolicy columnMatchPolicy = ColumnMatchPolicy.fromString(property); | ||
if (columnMatchPolicy == null) { | ||
String msg = String.format("%s (case insensitive)", String.join(", ", ColumnMatchPolicy.getCandidates())); | ||
ErrorReport.reportSemanticException( | ||
ErrorCode.ERR_INVALID_VALUE, InsertStmt.PROPERTY_MATCH_COLUMN_BY, property, msg); | ||
} | ||
insertStmt.setColumnMatchPolicy(columnMatchPolicy); | ||
} | ||
|
||
// check common properties | ||
// use session variable if not set max_filter_ratio property | ||
if (!properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) { | ||
properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, | ||
String.valueOf(session.getSessionVariable().getInsertMaxFilterRatio())); | ||
} | ||
// use session variable if not set strict_mode property | ||
if (!properties.containsKey(LoadStmt.STRICT_MODE) && | ||
session.getSessionVariable().getEnableInsertStrict()) { | ||
properties.put(LoadStmt.STRICT_MODE, "true"); | ||
if (!properties.containsKey(LoadStmt.STRICT_MODE)) { | ||
properties.put(LoadStmt.STRICT_MODE, String.valueOf(session.getSessionVariable().getEnableInsertStrict())); | ||
} | ||
|
||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 42601 more suitable?