Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Catalog] Catalog add Case Conversion Definition #5328

Merged
merged 14 commits into from
Sep 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Getter
@EqualsAndHashCode
Expand Down Expand Up @@ -54,28 +56,52 @@ public static TablePath of(String databaseName, String schemaName, String tableN
}

public String getSchemaAndTableName() {
return String.format("%s.%s", schemaName, tableName);
return getNameCommon(null, schemaName, tableName, null, null);
}

public String getSchemaAndTableName(String quote) {
return getNameCommon(null, schemaName, tableName, quote, quote);
}

public String getFullName() {
if (schemaName == null) {
return String.format("%s.%s", databaseName, tableName);
}
return String.format("%s.%s.%s", databaseName, schemaName, tableName);
return getNameCommon(databaseName, schemaName, tableName, null, null);
}

public String getFullNameWithQuoted() {
return getFullNameWithQuoted("`");
}

public String getFullNameWithQuoted(String quote) {
if (schemaName == null) {
return String.format(
"%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote);
return getNameCommon(databaseName, schemaName, tableName, quote, quote);
}

public String getFullNameWithQuoted(String quoteLeft, String quoteRight) {
return getNameCommon(databaseName, schemaName, tableName, quoteLeft, quoteRight);
}

private String getNameCommon(
String databaseName,
String schemaName,
String tableName,
String quoteLeft,
String quoteRight) {
List<String> joinList = new ArrayList<>();
quoteLeft = quoteLeft == null ? "" : quoteLeft;
quoteRight = quoteRight == null ? "" : quoteRight;

if (databaseName != null) {
joinList.add(quoteLeft + databaseName + quoteRight);
}

if (schemaName != null) {
joinList.add(quoteLeft + schemaName + quoteRight);
}
return String.format(
"%s%s%s.%s%s%s.%s%s%s",
quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote);

if (tableName != null) {
joinList.add(quoteLeft + tableName + quoteRight);
}

return String.join(".", joinList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class MysqlCreateTableSqlBuilder {

private MysqlDataTypeConvertor mysqlDataTypeConvertor;

private String fieldIde;

private MysqlCreateTableSqlBuilder(String tableName) {
checkNotNull(tableName, "tableName must not be null");
this.tableName = tableName;
Expand All @@ -76,7 +79,8 @@ public static MysqlCreateTableSqlBuilder builder(
.charset(null)
.primaryKey(tableSchema.getPrimaryKey())
.constraintKeys(tableSchema.getConstraintKeys())
.addColumn(tableSchema.getColumns());
.addColumn(tableSchema.getColumns())
.fieldIde(catalogTable.getOptions().get("fieldIde"));
}

public MysqlCreateTableSqlBuilder addColumn(List<Column> columns) {
Expand All @@ -90,6 +94,11 @@ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) {
return this;
}

public MysqlCreateTableSqlBuilder fieldIde(String fieldIde) {
this.fieldIde = fieldIde;
return this;
}

public MysqlCreateTableSqlBuilder constraintKeys(List<ConstraintKey> constraintKeys) {
this.constraintKeys = constraintKeys;
return this;
Expand Down Expand Up @@ -120,7 +129,8 @@ public String build(String catalogName) {
sqls.add(
String.format(
"CREATE TABLE %s (\n%s\n)",
tableName, buildColumnsIdentifySql(catalogName)));
CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"),
buildColumnsIdentifySql(catalogName)));
if (engine != null) {
sqls.add("ENGINE = " + engine);
}
Expand Down Expand Up @@ -157,7 +167,7 @@ private String buildColumnsIdentifySql(String catalogName) {

private String buildColumnIdentifySql(Column column, String catalogName) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(column.getName());
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
if (StringUtils.equals(catalogName, "mysql")) {
columnSqls.add(column.getSourceType());
} else {
Expand Down Expand Up @@ -243,7 +253,7 @@ private String buildPrimaryKeySql() {
.map(columnName -> "`" + columnName + "`")
.collect(Collectors.joining(", "));
// add sort type
return String.format("PRIMARY KEY (%s)", key);
return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
}

private String buildConstraintKeySql(ConstraintKey constraintKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -36,23 +37,27 @@ public class OracleCreateTableSqlBuilder {
private PrimaryKey primaryKey;
private OracleDataTypeConvertor oracleDataTypeConvertor;
private String sourceCatalogName;
private String fieldIde;

public OracleCreateTableSqlBuilder(CatalogTable catalogTable) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.oracleDataTypeConvertor = new OracleDataTypeConvertor();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
}

public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(tablePath.getSchemaAndTableName("\""))
.append(" (\n");

List<String> columnSqls =
columns.stream().map(this::buildColumnSql).collect(Collectors.toList());
columns.stream()
.map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
.collect(Collectors.toList());

// Add primary key directly in the create table statement
if (primaryKey != null
Expand All @@ -70,7 +75,7 @@ public String build(TablePath tablePath) {
.map(
column ->
buildColumnCommentSql(
column, tablePath.getSchemaAndTableName()))
column, tablePath.getSchemaAndTableName("\"")))
.collect(Collectors.toList());

if (!commentSqls.isEmpty()) {
Expand All @@ -83,7 +88,7 @@ public String build(TablePath tablePath) {

private String buildColumnSql(Column column) {
StringBuilder columnSql = new StringBuilder();
columnSql.append(column.getName()).append(" ");
columnSql.append("\"").append(column.getName()).append("\" ");

String columnType =
sourceCatalogName.equals("oracle")
Expand All @@ -95,11 +100,6 @@ private String buildColumnSql(Column column) {
columnSql.append(" NOT NULL");
}

// if (column.getDefaultValue() != null) {
// columnSql.append(" DEFAULT
// '").append(column.getDefaultValue().toString()).append("'");
// }

return columnSql.toString();
}

Expand Down Expand Up @@ -140,29 +140,37 @@ private String buildColumnType(Column column) {

private String buildPrimaryKeySql(PrimaryKey primaryKey) {
String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4);
String columnNamesString = String.join(", ", primaryKey.getColumnNames());
String columnNamesString =
primaryKey.getColumnNames().stream()
.map(columnName -> "\"" + columnName + "\"")
.collect(Collectors.joining(", "));

// In Oracle database, the maximum length for an identifier is 30 characters.
String primaryKeyStr = primaryKey.getPrimaryKey();
if (primaryKeyStr.length() > 25) {
primaryKeyStr = primaryKeyStr.substring(0, 25);
}

return "CONSTRAINT "
+ primaryKeyStr
+ "_"
+ randomSuffix
+ " PRIMARY KEY ("
+ columnNamesString
+ ")";
return CatalogUtils.getFieldIde(
"CONSTRAINT "
+ primaryKeyStr
+ "_"
+ randomSuffix
+ " PRIMARY KEY ("
+ columnNamesString
+ ")",
fieldIde);
}

private String buildColumnCommentSql(Column column, String tableName) {
StringBuilder columnCommentSql = new StringBuilder();
columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append(".");
columnCommentSql
.append(column.getName())
.append(" IS '")
.append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde))
.append(tableName)
.append(".");
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
.append(column.getComment())
.append("'");
return columnCommentSql.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -37,23 +38,30 @@ public class PostgresCreateTableSqlBuilder {
private PrimaryKey primaryKey;
private PostgresDataTypeConvertor postgresDataTypeConvertor;
private String sourceCatalogName;
private String fieldIde;

public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.postgresDataTypeConvertor = new PostgresDataTypeConvertor();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
}

public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(CatalogUtils.quoteIdentifier("CREATE TABLE ", fieldIde))
.append(tablePath.getSchemaAndTableName("\""))
.append(" (\n");

List<String> columnSqls =
columns.stream().map(this::buildColumnSql).collect(Collectors.toList());
columns.stream()
.map(
column ->
CatalogUtils.quoteIdentifier(
buildColumnSql(column), fieldIde))
.collect(Collectors.toList());

createTableSql.append(String.join(",\n", columnSqls));
createTableSql.append("\n);");
Expand All @@ -64,7 +72,7 @@ public String build(TablePath tablePath) {
.map(
columns ->
buildColumnCommentSql(
columns, tablePath.getSchemaAndTableName()))
columns, tablePath.getSchemaAndTableName("\"")))
.collect(Collectors.toList());

if (!commentSqls.isEmpty()) {
Expand All @@ -77,7 +85,7 @@ public String build(TablePath tablePath) {

private String buildColumnSql(Column column) {
StringBuilder columnSql = new StringBuilder();
columnSql.append(column.getName()).append(" ");
columnSql.append("\"").append(column.getName()).append("\" ");

// For simplicity, assume the column type in SeaTunnelDataType is the same as in PostgreSQL
String columnType =
Expand All @@ -96,12 +104,6 @@ private String buildColumnSql(Column column) {
columnSql.append(" PRIMARY KEY");
}

// Add default value if exists
// if (column.getDefaultValue() != null) {
// columnSql.append(" DEFAULT
// '").append(column.getDefaultValue().toString()).append("'");
// }

return columnSql.toString();
}

Expand Down Expand Up @@ -133,10 +135,13 @@ private String buildColumnType(Column column) {

private String buildColumnCommentSql(Column column, String tableName) {
StringBuilder columnCommentSql = new StringBuilder();
columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append(".");
columnCommentSql
.append(column.getName())
.append(" IS '")
.append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde))
.append(tableName)
.append(".");
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
.append(column.getComment())
.append("'");
return columnCommentSql.toString();
Expand Down
Loading
Loading