Skip to content

Commit

Permalink
[Hotfix][Connector-V2][StarRocks] fix starrocks template sql parser a…
Browse files Browse the repository at this point in the history
…pache#5071 (apache#5332)

* [Hotfix][Connector-V2][StarRocks] fix starrocks template sql parser offset && add primary key for default template
  • Loading branch information
aijing-sun authored and Zhouwen-CN committed Sep 11, 2023
1 parent b5c864c commit 82db1f7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public interface StarRocksSinkOptions {
.stringType()
.defaultValue(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n"
+ "${rowtype_primary_key},\n"
+ "${rowtype_fields}\n"
+ ") ENGINE=OLAP\n"
+ " PRIMARY KEY (${rowtype_primary_key})\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.apache.commons.lang3.StringUtils;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,8 +84,14 @@ private static String mergeColumnInTemplate(
Map<String, Column> columnMap =
tableSchema.getColumns().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));
for (String col : columnInTemplate.keySet()) {
CreateTableParser.ColumnInfo columnInfo = columnInTemplate.get(col);
List<CreateTableParser.ColumnInfo> columnInfosInSeq =
columnInTemplate.values().stream()
.sorted(
Comparator.comparingInt(
CreateTableParser.ColumnInfo::getStartIndex))
.collect(Collectors.toList());
for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
String col = columnInfo.getName();
if (StringUtils.isEmpty(columnInfo.getInfo())) {
if (columnMap.containsKey(col)) {
Column column = columnMap.get(col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -70,4 +73,83 @@ public void test() {

System.out.println(result);
}

@Test
public void testInSeq() {

List<Column> columns = new ArrayList<>();

columns.add(PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_LINENUMBER", BasicType.INT_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_QUANTITY", new DecimalType(15, 2), null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_EXTENDEDPRICE", new DecimalType(15, 2), null, false, null, ""));
columns.add(PhysicalColumn.of("L_DISCOUNT", new DecimalType(15, 2), null, false, null, ""));
columns.add(PhysicalColumn.of("L_TAX", new DecimalType(15, 2), null, false, null, ""));
columns.add(
PhysicalColumn.of("L_RETURNFLAG", BasicType.STRING_TYPE, null, false, null, ""));
columns.add(
PhysicalColumn.of("L_LINESTATUS", BasicType.STRING_TYPE, null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_COMMITDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_RECEIPTDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, ""));
columns.add(
PhysicalColumn.of("L_SHIPINSTRUCT", BasicType.STRING_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_SHIPMODE", BasicType.STRING_TYPE, null, false, null, ""));
columns.add(PhysicalColumn.of("L_COMMENT", BasicType.STRING_TYPE, null, false, null, ""));

String result =
StarRocksSaveModeUtil.fillingCreateSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n"
+ "`L_COMMITDATE`,\n"
+ "${rowtype_primary_key},\n"
+ "L_SUPPKEY BIGINT NOT NULL,\n"
+ "${rowtype_fields}\n"
+ ") ENGINE=OLAP\n"
+ " PRIMARY KEY (L_COMMITDATE, ${rowtype_primary_key}, L_SUPPKEY)\n"
+ "DISTRIBUTED BY HASH (${rowtype_primary_key})"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\" \n"
+ ")",
"tpch",
"lineitem",
TableSchema.builder()
.primaryKey(
PrimaryKey.of(
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
.build());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ "`L_COMMITDATE` DATE NOT NULL ,\n"
+ "`L_ORDERKEY` INT NOT NULL ,`L_LINENUMBER` INT NOT NULL ,\n"
+ "L_SUPPKEY BIGINT NOT NULL,\n"
+ "`L_PARTKEY` INT NOT NULL ,\n"
+ "`L_QUANTITY` Decimal(15, 2) NOT NULL ,\n"
+ "`L_EXTENDEDPRICE` Decimal(15, 2) NOT NULL ,\n"
+ "`L_DISCOUNT` Decimal(15, 2) NOT NULL ,\n"
+ "`L_TAX` Decimal(15, 2) NOT NULL ,\n"
+ "`L_RETURNFLAG` STRING NOT NULL ,\n"
+ "`L_LINESTATUS` STRING NOT NULL ,\n"
+ "`L_SHIPDATE` DATE NOT NULL ,\n"
+ "`L_RECEIPTDATE` DATE NOT NULL ,\n"
+ "`L_SHIPINSTRUCT` STRING NOT NULL ,\n"
+ "`L_SHIPMODE` STRING NOT NULL ,\n"
+ "`L_COMMENT` STRING NOT NULL \n"
+ ") ENGINE=OLAP\n"
+ " PRIMARY KEY (L_COMMITDATE, `L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+ "DISTRIBUTED BY HASH (`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (\n"
+ " \"replication_num\" = \"1\" \n"
+ ")";
Assertions.assertEquals(result, expected);
}
}

0 comments on commit 82db1f7

Please sign in to comment.