Skip to content

Commit

Permalink
[fix][dingo-executor] Check ddl sql type
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 authored and githubgxll committed Nov 12, 2024
1 parent 0c2f42c commit abc198a
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 153 deletions.
204 changes: 108 additions & 96 deletions dingo-calcite/src/main/java/io/dingodb/calcite/DingoDdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.dingodb.common.table.TableDefinition;
import io.dingodb.common.tenant.TenantConstant;
import io.dingodb.common.type.DingoType;
import io.dingodb.common.type.DingoTypeFactory;
import io.dingodb.common.type.ListType;
import io.dingodb.common.type.MapType;
import io.dingodb.common.type.scalar.BooleanType;
Expand Down Expand Up @@ -312,7 +313,9 @@ public void execute(SqlDropTenant tenant, CalcitePrepare.Context context) {
for (Map.Entry<String, SchemaTables> entry : infoSchema.schemaMap.entrySet()) {
SchemaTables schemaTables = entry.getValue();
for (Map.Entry<String, Table> tableEntry : schemaTables.getTables().entrySet()) {
metaService.dropTable(tenantId, schemaTables.getSchemaInfo().getSchemaId(), tableEntry.getKey());
metaService.dropTable(
tenantId, schemaTables.getSchemaInfo().getSchemaId(), tableEntry.getKey()
);
}
}
}
Expand All @@ -328,8 +331,6 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {
long start = System.currentTimeMillis();
DingoSqlCreateTable create = (DingoSqlCreateTable) createT;
LogUtils.info(log, "DDL execute: {}", create.getOriginalCreateSql().toUpperCase());
String connId = (String) context.getDataContext().get("connId");
SubSnapshotSchema schema = getSnapShotSchema(create.name, context, false);
SqlNodeList columnList = create.columnList;
if (columnList == null) {
throw SqlUtil.newContextException(create.name.getParserPosition(),
Expand Down Expand Up @@ -406,6 +407,7 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {
if (distinctColCnt != realColCnt) {
throw DINGO_RESOURCE.duplicateColumn().ex();
}
SubSnapshotSchema schema = getSnapShotSchema(create.name, context, false);
if (schema == null) {
if (context.getDefaultSchemaPath() != null && !context.getDefaultSchemaPath().isEmpty()) {
throw DINGO_RESOURCE.unknownSchema(context.getDefaultSchemaPath().get(0)).ex();
Expand Down Expand Up @@ -454,7 +456,10 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {

tableDefinition.setIndices(indexTableDefinitions);
DdlService ddlService = DdlService.root();
ddlService.createTableWithInfo(schema.getSchemaName(), tableName, tableDefinition, connId, create.getOriginalCreateSql());
String connId = (String) context.getDataContext().get("connId");
ddlService.createTableWithInfo(
schema.getSchemaName(), tableName, tableDefinition, connId, create.getOriginalCreateSql()
);

RootCalciteSchema rootCalciteSchema = (RootCalciteSchema) context.getMutableRootSchema();
RootSnapshotSchema rootSnapshotSchema = (RootSnapshotSchema) rootCalciteSchema.schema;
Expand All @@ -475,29 +480,10 @@ public void execute(SqlCreateTable createT, CalcitePrepare.Context context) {
}
}

private static void validateEngine(List<IndexDefinition> indexTableDefinitions, String engine) {
if (!indexTableDefinitions.isEmpty()) {
if (isNotTxnEngine(engine)) {
throw new IllegalArgumentException("Table with index, the engine must be transactional.");
}
indexTableDefinitions.stream()
.filter(index -> isNotTxnEngine(index.getEngine()))
.findAny().ifPresent(index -> {
throw new IllegalArgumentException("Index [" + index.getName() + "] engine must be transactional.");
});
}
}

private static boolean isNotTxnEngine(String engine) {
return engine != null && !engine.isEmpty() && !engine.toUpperCase().startsWith("TXN");
}

@SuppressWarnings({"unused", "MethodMayBeStatic"})
public void execute(SqlDropTable drop, CalcitePrepare.Context context) throws Exception {
final Timer.Context timeCtx = DingoMetrics.getTimeContext("dropTable");
long start = System.currentTimeMillis();
LogUtils.info(log, "DDL execute: {}", drop);
String connId = (String) context.getDataContext().get("connId");
final SubSnapshotSchema schema = getSnapShotSchema(drop.name, context, drop.ifExists);
if (schema == null) {
return;
Expand All @@ -516,6 +502,7 @@ public void execute(SqlDropTable drop, CalcitePrepare.Context context) throws Ex
}
}
DdlService ddlService = DdlService.root();
String connId = (String) context.getDataContext().get("connId");
ddlService.dropTable(schemaInfo, table.tableId.seq, tableName, connId);

RootCalciteSchema rootCalciteSchema = (RootCalciteSchema) context.getMutableRootSchema();
Expand Down Expand Up @@ -674,7 +661,8 @@ public void execute(@NonNull SqlCreateUser sqlCreateUser, CalcitePrepare.Context
throw DINGO_RESOURCE.createUserFailed(sqlCreateUser.user, sqlCreateUser.host).ex();
} else {
userDefinition.setPlugin(sqlCreateUser.plugin);
if ("dingo_ldap".equalsIgnoreCase(sqlCreateUser.plugin) && StringUtils.isNoneBlank(sqlCreateUser.pluginDn)) {
if ("dingo_ldap".equalsIgnoreCase(sqlCreateUser.plugin)
&& StringUtils.isNoneBlank(sqlCreateUser.pluginDn)) {
userDefinition.setLdapUser(sqlCreateUser.pluginDn);
}
userDefinition.setRequireSsl(sqlCreateUser.requireSsl);
Expand Down Expand Up @@ -750,42 +738,6 @@ public void execute(@NonNull SqlAlterTableDistribution sqlAlterTableDistribution
timeCtx.stop();
}

private static TableDefinition fromTable(Table table) {
return TableDefinition.builder()
.name(table.name)
.tableType(table.tableType)
.updateTime(table.updateTime)
.version(table.version)
.engine(table.engine)
.autoIncrement(table.autoIncrement)
.charset(table.charset)
.createSql(table.createSql)
.replica(table.replica)
.rowFormat(table.rowFormat)
.createTime(table.createTime)
.comment(table.comment)
.collate(table.collate)
.columns(table.columns.stream().map(DingoDdlExecutor::fromColumn).collect(Collectors.toList()))
.properties(table.properties)
.build();
}

private static ColumnDefinition fromColumn(Column column) {
return ColumnDefinition.builder()
.name(column.name)
.type(column.sqlTypeName)
.state(column.state)
.autoIncrement(column.autoIncrement)
.elementType(column.elementTypeName)
.comment(column.comment)
.defaultValue(column.defaultValueExpr)
.nullable(column.isNullable())
.precision(column.precision)
.primary(column.primaryKeyIndex)
.scale(column.scale)
.build();
}

public void execute(@NonNull SqlAlterAddIndex sqlAlterAddIndex, CalcitePrepare.Context context) {
final Pair<SubSnapshotSchema, String> schemaTableName
= getSchemaAndTableName(sqlAlterAddIndex.table, context);
Expand Down Expand Up @@ -985,38 +937,6 @@ public void execute(SqlAlterDropColumn sqlAlterDropColumn, CalcitePrepare.Contex
LogUtils.info(log, "drop column done, tableName:{}, column:{}", tableName, sqlAlterDropColumn.columnNm);
}

public static void validateDropIndex(DingoTable table, String indexName) {
if (isNotTxnEngine(table.getTable().getEngine())) {
throw new IllegalArgumentException("Drop index, the engine must be transactional.");
}
if (table.getIndexTableDefinitions().stream().map(IndexTable::getName).noneMatch(indexName::equalsIgnoreCase)) {
throw new RuntimeException("The index " + indexName + " not exist.");
}
}

public static void validateIndex(SubSnapshotSchema schema, String tableName, TableDefinition index) {
if (isNotTxnEngine(index.getEngine())) {
throw new IllegalArgumentException("the index engine must be transactional.");
}
DingoTable table = schema.getTable(tableName);
if (table == null) {
throw DINGO_RESOURCE.tableNotExists(tableName).ex();
}
String indexName = index.getName();

for (IndexTable existIndex : table.getIndexTableDefinitions()) {
String name = existIndex.getName();
if (indexName.equalsIgnoreCase(name)) {
throw new RuntimeException("The index " + indexName + " already exist.");
}

if ("vector".equalsIgnoreCase(index.getProperties().getProperty("indexType"))
&& existIndex.getColumns().get(1).equals(index.getColumn(1))) {
throw new RuntimeException("The vector index column same of " + existIndex.getName());
}
}
}

public void execute(@NonNull SqlAlterUser sqlAlterUser, CalcitePrepare.Context context) {
final Timer.Context timeCtx = DingoMetrics.getTimeContext("alterUser");
LogUtils.info(log, "DDL execute: {}", sqlAlterUser.toLog());
Expand Down Expand Up @@ -1259,7 +1179,9 @@ private static void validateAddColumn(ColumnDefinition newColumn) {

}

private static List<IndexDefinition> getIndexDefinitions(DingoSqlCreateTable create, TableDefinition tableDefinition) {
private static List<IndexDefinition> getIndexDefinitions(
DingoSqlCreateTable create, TableDefinition tableDefinition
) {
assert create.columnList != null;
List<IndexDefinition> tableDefList = create.columnList.stream()
.filter(col -> col.getKind() == SqlKind.UNIQUE)
Expand Down Expand Up @@ -1391,7 +1313,6 @@ private static IndexDefinition fromSqlIndexDeclaration(

// Primary key list
List<String> columns = indexDeclaration.columnList;
List<String> originKeyList = new ArrayList<>(columns);

int keySize = columns.size();
List<ColumnDefinition> keyColumns = tableDefinition.getKeyColumns();
Expand All @@ -1400,7 +1321,11 @@ private static IndexDefinition fromSqlIndexDeclaration(
.sorted(Comparator.comparingInt(ColumnDefinition::getPrimary))
.map(ColumnDefinition::getName)
.map(String::toUpperCase)
.peek(__ -> { if (columns.contains(__)) num.getAndIncrement(); })
.peek(__ -> {
if (columns.contains(__)) {
num.getAndIncrement();
}
})
.filter(__ -> !columns.contains(__))
.forEach(columns::add);

Expand Down Expand Up @@ -1589,8 +1514,10 @@ private static IndexDefinition fromSqlIndexDeclaration(
indexColumnDefinitions.add(indexColumnDefinition);
}
}
List<String> originKeyList = new ArrayList<>(columns);
IndexDefinition indexTableDefinition = IndexDefinition.createIndexDefinition(
indexDeclaration.index, tableDefinition, indexDeclaration.unique, originKeyList, indexDeclaration.withColumnList
indexDeclaration.index, tableDefinition,
indexDeclaration.unique, originKeyList, indexDeclaration.withColumnList
);
indexTableDefinition.setColumns(indexColumnDefinitions);
indexTableDefinition.setPartDefinition(indexDeclaration.getPartDefinition());
Expand Down Expand Up @@ -1742,4 +1669,89 @@ private static SubSnapshotSchema getSnapShotSchema(
return Pair.of(getSnapShotSchema(id, context, false), getTableName(id));
}

private static void validateEngine(List<IndexDefinition> indexTableDefinitions, String engine) {
if (!indexTableDefinitions.isEmpty()) {
if (isNotTxnEngine(engine)) {
throw new IllegalArgumentException("Table with index, the engine must be transactional.");
}
indexTableDefinitions.stream()
.filter(index -> isNotTxnEngine(index.getEngine()))
.findAny().ifPresent(index -> {
throw new IllegalArgumentException("Index [" + index.getName() + "] engine must be transactional.");
});
}
}

private static boolean isNotTxnEngine(String engine) {
return engine != null && !engine.isEmpty() && !engine.toUpperCase().startsWith("TXN");
}

private static TableDefinition fromTable(Table table) {
return TableDefinition.builder()
.name(table.name)
.tableType(table.tableType)
.updateTime(table.updateTime)
.version(table.version)
.engine(table.engine)
.autoIncrement(table.autoIncrement)
.charset(table.charset)
.createSql(table.createSql)
.replica(table.replica)
.rowFormat(table.rowFormat)
.createTime(table.createTime)
.comment(table.comment)
.collate(table.collate)
.columns(table.columns.stream().map(DingoDdlExecutor::fromColumn).collect(Collectors.toList()))
.properties(table.properties)
.build();
}

private static ColumnDefinition fromColumn(Column column) {
return ColumnDefinition.builder()
.name(column.name)
.type(column.sqlTypeName)
.state(column.state)
.autoIncrement(column.autoIncrement)
.elementType(column.elementTypeName)
.comment(column.comment)
.defaultValue(column.defaultValueExpr)
.nullable(column.isNullable())
.precision(column.precision)
.primary(column.primaryKeyIndex)
.scale(column.scale)
.build();
}

public static void validateDropIndex(DingoTable table, String indexName) {
if (isNotTxnEngine(table.getTable().getEngine())) {
throw new IllegalArgumentException("Drop index, the engine must be transactional.");
}
if (table.getIndexTableDefinitions().stream().map(IndexTable::getName).noneMatch(indexName::equalsIgnoreCase)) {
throw new RuntimeException("The index " + indexName + " not exist.");
}
}

public static void validateIndex(SubSnapshotSchema schema, String tableName, TableDefinition index) {
if (isNotTxnEngine(index.getEngine())) {
throw new IllegalArgumentException("the index engine must be transactional.");
}
DingoTable table = schema.getTable(tableName);
if (table == null) {
throw DINGO_RESOURCE.tableNotExists(tableName).ex();
}
String indexName = index.getName();

for (IndexTable existIndex : table.getIndexTableDefinitions()) {
String name = existIndex.getName();
if (indexName.equalsIgnoreCase(name)) {
throw new RuntimeException("The index " + indexName + " already exist.");
}

if ("vector".equalsIgnoreCase(index.getProperties().getProperty("indexType"))
&& existIndex.getColumns().get(1).equals(index.getColumn(1))) {
throw new RuntimeException("The vector index column same of " + existIndex.getName());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class DingoTypeFactory {

private DingoTypeFactory() {
scalarGenerators = new TreeMap<>(String::compareToIgnoreCase);
scalarGenerators.put("TINYINT", IntegerType::new);
scalarGenerators.put("INT", IntegerType::new);
scalarGenerators.put("INTEGER", IntegerType::new);
scalarGenerators.put("LONG", LongType::new);
Expand Down Expand Up @@ -82,6 +83,7 @@ private DingoTypeFactory() {

private static String typeNameOfSqlTypeId(int typeId) {
switch (typeId) {
case Types.TINYINT:
case Types.INTEGER:
return "INT";
case Types.BIGINT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ public static Pair<Boolean, Long> doReorgWorkForAddCol(
//if (res.getValue() != null) {
// error = res.getValue();
//}
String error1 = JobTableUtil.removeDDLReorgHandle(session, job.getId(), reorgInfo.getElements());
if (error1 != null) {
LogUtils.warn(log, "[ddl] run add index job failed, convert job to rollback, "
+ "RemoveDDLReorgHandle failed, jobId:{}, error:{}", job.getId(), error1);
}
}
throw new RuntimeException(error);
}
Expand Down Expand Up @@ -149,25 +144,6 @@ public static Pair<Boolean, Long> doReorgWorkForDropCol(
p -> addReplicaTable(reorgInfoRes.getKey(), BackFilling.typeDropColumnWorker)
);
if (error != null) {
if ("ErrWaitReorgTimeout".equalsIgnoreCase(error)) {
return Pair.of(false, 0L);
}
if ("ErrKeyExists".equalsIgnoreCase(error)
|| "ErrCancelledDDLJob".equalsIgnoreCase(error)
|| "ErrCantDecodeRecord".equalsIgnoreCase(error)
) {
LogUtils.warn(log, "[ddl] run add index job failed, convert job to rollback, "
+ "jobId:{}, error:{}", job.getId(), error);
//Pair<Long, String> res = RollingBackUtil.convertAddIdxJob2RollbackJob(dc, job, replicaTable);
//if (res.getValue() != null) {
// error = res.getValue();
//}
String error1 = JobTableUtil.removeDDLReorgHandle(session, job.getId(), reorgInfo.getElements());
if (error1 != null) {
LogUtils.warn(log, "[ddl] run add index job failed, convert job to rollback, "
+ "RemoveDDLReorgHandle failed, jobId:{}, error:{}", job.getId(), error1);
}
}
throw new RuntimeException(error);
}
return Pair.of(true, 0L);
Expand Down
Loading

0 comments on commit abc198a

Please sign in to comment.