Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYLIN-5473] Use Calcite to parse DDL to create Model #2099

Open
wants to merge 3 commits into
base: kylin5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,22 @@
</exclusion>
</exclusions>
</dependency>
<!-- Calcite Server dependencies for DDL -->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-server</artifactId>
<version>${calcite.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,16 @@ public static String[] split(String str, String splitBy) {
return str.split(splitBy);
}

public static String extractSubStringIgnoreSensitive(String origin, String sub) {
String s1 = origin.toLowerCase(Locale.ROOT);
String s2 = sub.toLowerCase(Locale.ROOT);
int i = s1.indexOf(s2);
if (i != -1) {
return origin.substring(i, i + sub.length());
} else {
return null;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class PercentileMeasureType extends MeasureType<PercentileCounter> {
public static final String FUNC_PERCENTILE = "PERCENTILE";
public static final String FUNC_PERCENTILE_APPROX = "PERCENTILE_APPROX";
public static final String DATATYPE_PERCENTILE = "percentile";
public static final String FUNC_PERCENTILE_100 = "PERCENTILE_100";
public static final String FUNC_PERCENTILE_1000 = "PERCENTILE_1000";
public static final String FUNC_PERCENTILE_10000 = "PERCENTILE_10000";

public PercentileMeasureType(String funcName, DataType dataType) {
this.dataType = dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
public class JoinTableDesc implements Serializable {
private static final long serialVersionUID = 1L;
Expand Down
4 changes: 4 additions & 0 deletions src/modeling-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-datasource-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-query</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_MERGE;
import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_REFRESH;
import static org.apache.kylin.metadata.model.FunctionDesc.PARAMETER_TYPE_COLUMN;
import static org.apache.kylin.query.util.DDLParser.UNDEFINED_TYPE;

import java.io.IOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -198,6 +199,8 @@
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.query.engine.KECalciteConfig;
import org.apache.kylin.query.util.DDLParser;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
Expand Down Expand Up @@ -232,6 +235,7 @@
import org.apache.kylin.rest.response.NDataModelResponse;
import org.apache.kylin.rest.response.NDataSegmentResponse;
import org.apache.kylin.rest.response.NModelDescResponse;
import org.apache.kylin.rest.response.ParameterResponse;
import org.apache.kylin.rest.response.PurgeModelAffectedResponse;
import org.apache.kylin.rest.response.RefreshAffectedSegmentsResponse;
import org.apache.kylin.rest.response.RelatedModelResponse;
Expand Down Expand Up @@ -331,6 +335,9 @@ public class ModelService extends AbstractModelService implements TableModelSupp
@Autowired
private IndexPlanService indexPlanService;

@Autowired
private TableService tableService;

@Autowired(required = false)
@Qualifier("modelBuildService")
private ModelBuildSupporter modelBuildService;
Expand Down Expand Up @@ -2003,6 +2010,93 @@ public NDataModel createModel(String project, ModelRequest modelRequest) {
}, project);
}

public NDataModel createModelByDDl(String sql) throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
String convertedSql = QueryUtil.normalMassageSql(config, sql, 0, 0);
DDLParser ddlParser = DDLParser.CreateParser(KECalciteConfig.fromKapConfig(config));
DDLParser.DDLParserResult ddlResult = ddlParser.parseSQL(convertedSql);
String project = ddlResult.getProjectName();
ModelRequest modelRequest = convertToRequest(ddlResult);
aclEvaluate.checkProjectOperationPermission(modelRequest.getProject());

return createModel(project, modelRequest);
}

private ModelRequest convertToRequest(DDLParser.DDLParserResult ddlResult) {
val request = new ModelRequest();
request.setProject(ddlResult.getProjectName());
request.setAlias(ddlResult.getModelName());

//join relations
request.setJoinTables(ddlResult.getJoinTables());
request.setRootFactTableName(ddlResult.getFactTable());

// set partitionCol
PartitionDesc desc = new PartitionDesc();
if (ddlResult.getPartitionColName() != null) {
desc.setPartitionDateColumn(ddlResult.getPartitionColName());
desc.setPartitionDateFormat(setPartitionColType(ddlResult));
}
request.setPartitionDesc(desc);

// set dimensions and measures
request.setSimplifiedDimensions(ddlResult.getSimplifiedDimensions());
request.setSimplifiedMeasures(convertToSimplifiedMeasure(ddlResult.getProjectName(),
ddlResult.getSimplifiedMeasures(), ddlResult.getFactTable()));

// Default add base index
request.setWithBaseIndex(true);
return request;
}

private String setPartitionColType(DDLParser.DDLParserResult ddlResult) {
NTableMetadataManager tableManager = tableService.getManager(NTableMetadataManager.class,
ddlResult.getProjectName());
ColumnDesc col = tableManager
.getTableDesc(ddlResult.getFactTable().split("\\.")[0] + "."
+ ddlResult.getPartitionColName().split("\\.")[0])
.findColumnByName(ddlResult.getPartitionColName());
if (col == null) {
throw new KylinException(INVALID_PARAMETER, "Can not find partition col" + ddlResult.getPartitionColName());
}
if (col.getDatatype().toLowerCase().contains("int")) {
return "yyyyMMdd";
} else {
return "yyyy-MM-dd";
}
}

private List<SimplifiedMeasure> convertToSimplifiedMeasure(String project,
List<DDLParser.InnerMeasure> innerMeasures, String factTable) {
int id = 100000;
List<SimplifiedMeasure> result = Lists.newArrayList();
NTableMetadataManager tableManager = tableService.getManager(NTableMetadataManager.class, project);
for (DDLParser.InnerMeasure innerMeasure : innerMeasures) {
SimplifiedMeasure simplifiedMeasure = new SimplifiedMeasure();
simplifiedMeasure.setExpression(innerMeasure.getExpression());
simplifiedMeasure.setId(id++);
simplifiedMeasure.setParameterValue(innerMeasure.getParameterValues().stream().map(pair ->
// Fist is type, second is colName
new ParameterResponse(pair.getFirst(), pair.getSecond())).collect(Collectors.toList()));
//Must at least have on args
String colNameWithTable = innerMeasure.getParameterValues().get(0).getSecond();
simplifiedMeasure.setName(colNameWithTable.toUpperCase(Locale.ROOT) + '_'
+ innerMeasure.getExpression().toUpperCase(Locale.ROOT));
if (innerMeasure.getReturnType() != UNDEFINED_TYPE) {
simplifiedMeasure.setReturnType(innerMeasure.getReturnType());
} else {
// Simple measure like min,max,sum need infer col type
// use tableManager should pass db_name.
String datatype = tableManager
.getTableDesc(factTable.split("\\.")[0] + "." + colNameWithTable.split("\\.")[0])
.findColumnByName(colNameWithTable).getDatatype();
simplifiedMeasure.setReturnType(datatype);
}
result.add(simplifiedMeasure);
}
return result;
}

private NDataModel doCheckBeforeModelSave(String project, ModelRequest modelRequest) {
checkAliasExist(modelRequest.getUuid(), modelRequest.getAlias(), project);
modelRequest.setOwner(AclPermissionUtil.getCurrentUsername());
Expand Down
4 changes: 4 additions & 0 deletions src/query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-server</artifactId>
</dependency>

<dependency>
<groupId>commons-collections</groupId>
Expand Down
Loading