diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java index 02918f0f96d..4f70eaab65c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java @@ -60,6 +60,7 @@ public interface StarRocksSinkOptions { .stringType() .defaultValue( "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + + "${rowtype_primary_key},\n" + "${rowtype_fields}\n" + ") ENGINE=OLAP\n" + " PRIMARY KEY (${rowtype_primary_key})\n" diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java index cb0d086859b..bbbc04eb20e 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java @@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -82,8 +84,14 @@ private static String mergeColumnInTemplate( Map columnMap = tableSchema.getColumns().stream() .collect(Collectors.toMap(Column::getName, Function.identity())); - for (String col : columnInTemplate.keySet()) { - CreateTableParser.ColumnInfo columnInfo = columnInTemplate.get(col); + List columnInfosInSeq = + columnInTemplate.values().stream() + .sorted( + Comparator.comparingInt( + CreateTableParser.ColumnInfo::getStartIndex)) + .collect(Collectors.toList()); + for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) { + String col = columnInfo.getName(); if (StringUtils.isEmpty(columnInfo.getInfo())) { if (columnMap.containsKey(col)) { Column column = columnMap.get(col); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java index 22536ffd684..b571deb68ad 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCreateTableTest.java @@ -22,8 +22,11 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -70,4 +73,83 @@ public void test() { System.out.println(result); } + + @Test + public void testInSeq() { + + List columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_LINENUMBER", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_QUANTITY", new DecimalType(15, 2), null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_EXTENDEDPRICE", new DecimalType(15, 2), null, false, null, "")); + columns.add(PhysicalColumn.of("L_DISCOUNT", new DecimalType(15, 2), null, false, null, "")); + columns.add(PhysicalColumn.of("L_TAX", new DecimalType(15, 2), null, false, null, "")); + columns.add( + PhysicalColumn.of("L_RETURNFLAG", BasicType.STRING_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of("L_LINESTATUS", BasicType.STRING_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_COMMITDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_RECEIPTDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of("L_SHIPINSTRUCT", BasicType.STRING_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_SHIPMODE", BasicType.STRING_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_COMMENT", BasicType.STRING_TYPE, null, false, null, "")); + + String result = + StarRocksSaveModeUtil.fillingCreateSql( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + + "`L_COMMITDATE`,\n" + + "${rowtype_primary_key},\n" + + "L_SUPPKEY BIGINT NOT NULL,\n" + + "${rowtype_fields}\n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (L_COMMITDATE, ${rowtype_primary_key}, L_SUPPKEY)\n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key})" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")", + "tpch", + "lineitem", + TableSchema.builder() + .primaryKey( + PrimaryKey.of( + "", Arrays.asList("L_ORDERKEY", "L_LINENUMBER"))) + .columns(columns) + .build()); + String expected = + "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" + + "`L_COMMITDATE` DATE NOT NULL ,\n" + + "`L_ORDERKEY` INT NOT NULL ,`L_LINENUMBER` INT NOT NULL ,\n" + + "L_SUPPKEY BIGINT NOT NULL,\n" + + "`L_PARTKEY` INT NOT NULL ,\n" + + "`L_QUANTITY` Decimal(15, 2) NOT NULL ,\n" + + "`L_EXTENDEDPRICE` Decimal(15, 2) NOT NULL ,\n" + + "`L_DISCOUNT` Decimal(15, 2) NOT NULL ,\n" + + "`L_TAX` Decimal(15, 2) NOT NULL ,\n" + + "`L_RETURNFLAG` STRING NOT NULL ,\n" + + "`L_LINESTATUS` STRING NOT NULL ,\n" + + "`L_SHIPDATE` DATE NOT NULL ,\n" + + "`L_RECEIPTDATE` DATE NOT NULL ,\n" + + "`L_SHIPINSTRUCT` STRING NOT NULL ,\n" + + "`L_SHIPMODE` STRING NOT NULL ,\n" + + "`L_COMMENT` STRING NOT NULL \n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (L_COMMITDATE, `L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n" + + "DISTRIBUTED BY HASH (`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")"; + Assertions.assertEquals(result, expected); + } }