diff --git a/pom.xml b/pom.xml
index 516eae722b9..f6c82dc53a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1840,6 +1840,22 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+            <!-- Calcite Server dependencies for DDL -->
+            <dependency>
+                <groupId>org.apache.calcite</groupId>
+                <artifactId>calcite-server</artifactId>
+                <version>${calcite.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.google.protobuf</groupId>
+                        <artifactId>protobuf-java</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.fasterxml.jackson.core</groupId>
+                        <artifactId>*</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <dependency>
                 <groupId>org.apache.calcite.avatica</groupId>
                 <artifactId>avatica-core</artifactId>
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
index 5776a5895b9..e946ea5ebc3 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/StringUtil.java
@@ -213,4 +213,16 @@ public static String[] split(String str, String splitBy) {
         return str.split(splitBy);
     }
 
+    public static String extractSubStringIgnoreSensitive(String origin, String sub) {
+        String s1 = origin.toLowerCase(Locale.ROOT);
+        String s2 = sub.toLowerCase(Locale.ROOT);
+        int i = s1.indexOf(s2);
+        if (i != -1) {
+            return origin.substring(i, i + sub.length());
+        } else {
+            return null;
+        }
+
+    }
+
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java b/src/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
index 69ae984293a..3c3d5653414 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
@@ -38,6 +38,9 @@ public class PercentileMeasureType extends MeasureType<PercentileCounter> {
     public static final String FUNC_PERCENTILE = "PERCENTILE";
     public static final String FUNC_PERCENTILE_APPROX = "PERCENTILE_APPROX";
     public static final String DATATYPE_PERCENTILE = "percentile";
+    public static final String FUNC_PERCENTILE_100 = "PERCENTILE_100";
+    public static final String FUNC_PERCENTILE_1000 = "PERCENTILE_1000";
+    public static final String FUNC_PERCENTILE_10000 = "PERCENTILE_10000";
 
     public PercentileMeasureType(String funcName, DataType dataType) {
         this.dataType = dataType;
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
index 169a79cac58..15675de3e9b 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
@@ -28,9 +28,11 @@
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 @Getter
 @Setter
+@ToString
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class JoinTableDesc implements Serializable {
     private static final long serialVersionUID = 1L;
diff --git a/src/modeling-service/pom.xml b/src/modeling-service/pom.xml
index c560e34aaae..a91d23060a0 100644
--- a/src/modeling-service/pom.xml
+++ b/src/modeling-service/pom.xml
@@ -43,6 +43,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-datasource-service</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-query</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index d83c15c7511..a96e7cd8a9b 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -70,6 +70,7 @@
 import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_MERGE;
 import static org.apache.kylin.job.execution.JobTypeEnum.INDEX_REFRESH;
 import static org.apache.kylin.metadata.model.FunctionDesc.PARAMETER_TYPE_COLUMN;
+import static org.apache.kylin.query.util.DDLParser.UNDEFINED_TYPE;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -198,6 +199,8 @@
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.streaming.KafkaConfig;
+import org.apache.kylin.query.engine.KECalciteConfig;
+import org.apache.kylin.query.util.DDLParser;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
@@ -232,6 +235,7 @@
 import org.apache.kylin.rest.response.NDataModelResponse;
 import org.apache.kylin.rest.response.NDataSegmentResponse;
 import org.apache.kylin.rest.response.NModelDescResponse;
+import org.apache.kylin.rest.response.ParameterResponse;
 import org.apache.kylin.rest.response.PurgeModelAffectedResponse;
 import org.apache.kylin.rest.response.RefreshAffectedSegmentsResponse;
 import org.apache.kylin.rest.response.RelatedModelResponse;
@@ -331,6 +335,9 @@ public class ModelService extends AbstractModelService implements TableModelSupp
     @Autowired
     private IndexPlanService indexPlanService;
 
+    @Autowired
+    private TableService tableService;
+
     @Autowired(required = false)
     @Qualifier("modelBuildService")
     private ModelBuildSupporter modelBuildService;
@@ -2003,6 +2010,93 @@ public NDataModel createModel(String project, ModelRequest modelRequest) {
         }, project);
     }
 
+    public NDataModel createModelByDDl(String sql) throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        String convertedSql = QueryUtil.normalMassageSql(config, sql, 0, 0);
+        DDLParser ddlParser = DDLParser.CreateParser(KECalciteConfig.fromKapConfig(config));
+        DDLParser.DDLParserResult ddlResult = ddlParser.parseSQL(convertedSql);
+        String project = ddlResult.getProjectName();
+        ModelRequest modelRequest = convertToRequest(ddlResult);
+        aclEvaluate.checkProjectOperationPermission(modelRequest.getProject());
+
+        return createModel(project, modelRequest);
+    }
+
+    private ModelRequest convertToRequest(DDLParser.DDLParserResult ddlResult) {
+        val request = new ModelRequest();
+        request.setProject(ddlResult.getProjectName());
+        request.setAlias(ddlResult.getModelName());
+
+        //join relations
+        request.setJoinTables(ddlResult.getJoinTables());
+        request.setRootFactTableName(ddlResult.getFactTable());
+
+        // set partitionCol
+        PartitionDesc desc = new PartitionDesc();
+        if (ddlResult.getPartitionColName() != null) {
+            desc.setPartitionDateColumn(ddlResult.getPartitionColName());
+            desc.setPartitionDateFormat(setPartitionColType(ddlResult));
+        }
+        request.setPartitionDesc(desc);
+
+        // set dimensions and measures
+        request.setSimplifiedDimensions(ddlResult.getSimplifiedDimensions());
+        request.setSimplifiedMeasures(convertToSimplifiedMeasure(ddlResult.getProjectName(),
+                ddlResult.getSimplifiedMeasures(), ddlResult.getFactTable()));
+
+        // Default add base index
+        request.setWithBaseIndex(true);
+        return request;
+    }
+
+    private String setPartitionColType(DDLParser.DDLParserResult ddlResult) {
+        NTableMetadataManager tableManager = tableService.getManager(NTableMetadataManager.class,
+                ddlResult.getProjectName());
+        ColumnDesc col = tableManager
+                .getTableDesc(ddlResult.getFactTable().split("\\.")[0] + "."
+                        + ddlResult.getPartitionColName().split("\\.")[0])
+                .findColumnByName(ddlResult.getPartitionColName());
+        if (col == null) {
+            throw new KylinException(INVALID_PARAMETER, "Can not find partition col" + ddlResult.getPartitionColName());
+        }
+        if (col.getDatatype().toLowerCase().contains("int")) {
+            return "yyyyMMdd";
+        } else {
+            return "yyyy-MM-dd";
+        }
+    }
+
+    private List<SimplifiedMeasure> convertToSimplifiedMeasure(String project,
+            List<DDLParser.InnerMeasure> innerMeasures, String factTable) {
+        int id = 100000;
+        List<SimplifiedMeasure> result = Lists.newArrayList();
+        NTableMetadataManager tableManager = tableService.getManager(NTableMetadataManager.class, project);
+        for (DDLParser.InnerMeasure innerMeasure : innerMeasures) {
+            SimplifiedMeasure simplifiedMeasure = new SimplifiedMeasure();
+            simplifiedMeasure.setExpression(innerMeasure.getExpression());
+            simplifiedMeasure.setId(id++);
+            simplifiedMeasure.setParameterValue(innerMeasure.getParameterValues().stream().map(pair ->
+            // Fist is type, second is colName
+            new ParameterResponse(pair.getFirst(), pair.getSecond())).collect(Collectors.toList()));
+            //Must at least have on args
+            String colNameWithTable = innerMeasure.getParameterValues().get(0).getSecond();
+            simplifiedMeasure.setName(colNameWithTable.toUpperCase(Locale.ROOT) + '_'
+                    + innerMeasure.getExpression().toUpperCase(Locale.ROOT));
+            if (innerMeasure.getReturnType() != UNDEFINED_TYPE) {
+                simplifiedMeasure.setReturnType(innerMeasure.getReturnType());
+            } else {
+                // Simple measure like min,max,sum need infer col type
+                // use tableManager should pass db_name.
+                String datatype = tableManager
+                        .getTableDesc(factTable.split("\\.")[0] + "." + colNameWithTable.split("\\.")[0])
+                        .findColumnByName(colNameWithTable).getDatatype();
+                simplifiedMeasure.setReturnType(datatype);
+            }
+            result.add(simplifiedMeasure);
+        }
+        return result;
+    }
+
     private NDataModel doCheckBeforeModelSave(String project, ModelRequest modelRequest) {
         checkAliasExist(modelRequest.getUuid(), modelRequest.getAlias(), project);
         modelRequest.setOwner(AclPermissionUtil.getCurrentUsername());
diff --git a/src/query/pom.xml b/src/query/pom.xml
index ca0c7cc2f26..f87ade3e517 100644
--- a/src/query/pom.xml
+++ b/src/query/pom.xml
@@ -68,6 +68,10 @@
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-server</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>commons-collections</groupId>
diff --git a/src/query/src/main/java/org/apache/kylin/query/util/DDLParser.java b/src/query/src/main/java/org/apache/kylin/query/util/DDLParser.java
new file mode 100644
index 00000000000..d4bc5bc551e
--- /dev/null
+++ b/src/query/src/main/java/org/apache/kylin/query/util/DDLParser.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util;
+
+import static org.apache.kylin.measure.percentile.PercentileMeasureType.FUNC_PERCENTILE;
+import static org.apache.kylin.measure.percentile.PercentileMeasureType.FUNC_PERCENTILE_100;
+import static org.apache.kylin.measure.percentile.PercentileMeasureType.FUNC_PERCENTILE_1000;
+import static org.apache.kylin.measure.percentile.PercentileMeasureType.FUNC_PERCENTILE_10000;
+import static org.apache.kylin.measure.percentile.PercentileMeasureType.FUNC_PERCENTILE_APPROX;
+import static org.apache.kylin.metadata.model.FunctionDesc.FUNC_COUNT;
+import static org.apache.kylin.metadata.model.FunctionDesc.FUNC_MAX;
+import static org.apache.kylin.metadata.model.FunctionDesc.FUNC_MIN;
+import static org.apache.kylin.metadata.model.FunctionDesc.FUNC_SUM;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.ddl.SqlCreateMaterializedView;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.ddl.ParseException;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.query.engine.KECalciteConfig;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.kylin.query.exception.UnsupportedQueryException;
+
+public class DDLParser {
+    private final SqlParser.Config config;
+    public static final String FUNC_HLL_COUNT = "HLL_COUNT";
+    public static final String FUNC_HLL_COUNT_10 = "HLL_COUNT_10";
+    public static final String FUNC_HLL_COUNT_12 = "HLL_COUNT_12";
+    public static final String FUNC_HLL_COUNT_14 = "HLL_COUNT_14";
+    public static final String FUNC_HLL_COUNT_15 = "HLL_COUNT_15";
+    public static final String FUNC_HLL_COUNT_16 = "HLL_COUNT_16";
+    public static final String FUNC_BITMAP_COUNT = "BITMAP_COUNT";
+
+    private static final List<String> SUPPORT_MEASURE_PREFIX = Lists.newArrayList(FUNC_BITMAP_COUNT, FUNC_HLL_COUNT,
+            FUNC_PERCENTILE, FUNC_SUM, FUNC_MAX, FUNC_MIN, FUNC_COUNT, FUNC_HLL_COUNT_10, FUNC_HLL_COUNT_12,
+            FUNC_HLL_COUNT_14, FUNC_HLL_COUNT_15, FUNC_HLL_COUNT_16, FUNC_PERCENTILE_APPROX, FUNC_PERCENTILE_100,
+            FUNC_PERCENTILE_1000, FUNC_PERCENTILE_10000);
+
+    public DDLParser(SqlParser.Config config) {
+        this.config = config;
+    }
+
+    public static DDLParser CreateParser(KECalciteConfig connectionConfig) {
+        SqlParser.Config parserConfig = SqlParser.configBuilder().setQuotedCasing(connectionConfig.quotedCasing())
+                .setUnquotedCasing(connectionConfig.unquotedCasing()).setQuoting(connectionConfig.quoting())
+                .setIdentifierMaxLength(1024).setConformance(connectionConfig.conformance())
+                .setCaseSensitive(connectionConfig.caseSensitive()).setParserFactory(SqlDdlParserImpl.FACTORY).build();
+        return new DDLParser(parserConfig);
+    }
+
+    public DDLParserResult parseSQL(String sql) throws Exception {
+        SqlCreateMaterializedView sNode = (SqlCreateMaterializedView) SqlParser.create(sql, this.config).parseQuery();
+        DDLParserResult result = new DDLParserResult();
+        SqlIdentifier identifier = (SqlIdentifier) sNode.getOperandList().get(0);
+        SqlSelect sqlSelect = (SqlSelect) sNode.getOperandList().get(2);
+
+        parseFromIdentifier(identifier, result, sql);
+
+        parseDimensionsAndMeasures(sqlSelect, result);
+        if (sqlSelect.getFrom() instanceof SqlJoin) {
+            SqlJoin from = (SqlJoin) sqlSelect.getFrom();
+            parseFromWithJoin(from, result);
+        } else {
+            SqlIdentifier from = (SqlIdentifier) sqlSelect.getFrom();
+            parseFromWithOutJoin(from, result);
+        }
+
+        return result;
+    }
+
+    private void parseFromWithOutJoin(SqlIdentifier fact, DDLParserResult result) throws ParseException {
+        result.setFactTable(getTableFullName(fact.names));
+        result.setJoinTables(Lists.newArrayList());
+    }
+
+    private final static String emptyJoinConditionErr = "DDL not support without join condition!";
+    private final static String joinTypeErr = "DDL only support InnerJoin or LeftJoin!";
+    private final static String joinConditionErr = "DDL only support equal join!";
+
+    private void checkJoin(SqlJoin sqlJoin) throws ParseException {
+        if (sqlJoin.getCondition() == null) {
+            throw new ParseException(emptyJoinConditionErr);
+        } else {
+            SqlCall cond = (SqlCall) sqlJoin.getCondition();
+            if (cond.toString().contains(">") || cond.toString().contains("<")) {
+                throw new ParseException(joinConditionErr);
+            }
+        }
+        JoinType joinType = sqlJoin.getJoinType();
+        if (joinType != JoinType.INNER && joinType != JoinType.LEFT) {
+            throw new ParseException(joinTypeErr);
+        }
+    }
+
+    private void parseFromWithJoin(SqlJoin sqlJoin, DDLParserResult result) throws ParseException {
+        checkJoin(sqlJoin);
+        SqlIdentifier fact;
+        List<SqlIdentifier> lookUp = Lists.newArrayList();
+        List<JoinDesc> joinsDesc = Lists.newArrayList();
+        SqlNode left = sqlJoin.getLeft();
+        SqlNode right = sqlJoin.getRight();
+        if (right != null) {
+            lookUp.add((SqlIdentifier) right);
+            JoinDesc joinDesc = getJoinDesc(sqlJoin);
+            joinsDesc.add(joinDesc);
+        }
+        while (left instanceof SqlJoin) {
+            SqlJoin leftJoin = (SqlJoin) left;
+            checkJoin(leftJoin);
+            lookUp.add((SqlIdentifier) leftJoin.getRight());
+            JoinDesc joinDesc = getJoinDesc((SqlJoin) left);
+            joinsDesc.add(joinDesc);
+            left = leftJoin.getLeft();
+        }
+        fact = (SqlIdentifier) left;
+        // 1. set factTable
+        result.setFactTable(getTableFullName(fact.names));
+
+        if (lookUp.size() != joinsDesc.size()) {
+            String msg = "Parse join info size fail" + sqlJoin;
+            throw new ParseException(msg);
+        }
+
+        // 2. set lookupTable and joinDesc
+        List<JoinTableDesc> joinTableDesc = Lists.newArrayList();
+        for (int i = 0; i < lookUp.size(); i++) {
+            JoinTableDesc jd = new JoinTableDesc();
+            SqlIdentifier l = lookUp.get(i);
+            if (l.names.size() < 2) {
+                throw new ParseException("In joinCondition table name must be db_name.table_name");
+            }
+            jd.setTable(getTableFullName(l.names));
+            // `names` like table_name.col_name
+            jd.setAlias(l.names.get(1));
+            jd.setJoin(joinsDesc.get(i));
+            joinTableDesc.add(jd);
+        }
+        result.setJoinTables(joinTableDesc);
+    }
+
+    private void parseDimensionsAndMeasures(SqlSelect sqlSelect, DDLParserResult result) throws ParseException {
+        SqlNodeList selectList = sqlSelect.getSelectList();
+        List<SqlIdentifier> dims = Lists.newArrayList();
+        List<SqlBasicCall> meas = Lists.newArrayList();
+
+        for (SqlNode node : selectList) {
+            if (node instanceof SqlIdentifier) {
+                dims.add((SqlIdentifier) node);
+            } else if (node instanceof SqlBasicCall) {
+                meas.add((SqlBasicCall) node);
+            } else {
+                throw new ParseException("Unexpected select: ".concat(node.toString()));
+            }
+        }
+
+        if (dims.isEmpty()) {
+            throw new ParseException("In DDL dimensions should not be empty.");
+        }
+        parseDimsInner(dims, result);
+        parseMeasInner(meas, result);
+
+    }
+
+    private void parseMeasInner(List<SqlBasicCall> meas, DDLParserResult result) {
+        List<InnerMeasure> measures = meas.stream().map(m -> {
+            // 1. set measure type
+            InnerMeasure measure = new InnerMeasure();
+            String measureName = m.getOperator().getName();
+            try {
+                checkMeasure(measureName);
+            } catch (ParseException e) {
+                throw new UnsupportedQueryException(e.toString());
+            }
+            measure.setExpression(getMeasureExprInner(measureName));
+
+            // 2. set related column
+            List<Pair<String, String>> parameterValues = Arrays.stream(m.getOperands()).map(operand -> {
+                Pair<String, String> pair = new Pair<>();
+                pair.setFirst("column");
+                try {
+                    pair.setSecond(getColNameWithTable(((SqlIdentifier) operand).names));
+                } catch (ParseException e) {
+                    throw new UnsupportedQueryException(e.toString());
+                }
+                return pair;
+            }).collect(Collectors.toList());
+            measure.setParameterValues(parameterValues);
+
+            // 3. set measure return type
+            measure.setReturnType(getMeasureTypeInner(measureName));
+            return measure;
+        }).collect(Collectors.toList());
+        result.setSimplifiedMeasures(measures);
+    }
+
+    private void checkMeasure(String measureName) throws ParseException {
+        String upperCaseName = measureName.toUpperCase();
+        boolean res = SUPPORT_MEASURE_PREFIX.stream().anyMatch(str -> str.equals(upperCaseName));
+        if (!res) {
+            throw new ParseException("Measure type not support: " + measureName);
+        }
+    }
+
+    private void parseDimsInner(List<SqlIdentifier> dims, DDLParserResult result) {
+        List<NDataModel.NamedColumn> cols = dims.stream().map(d -> {
+            NDataModel.NamedColumn col = new NDataModel.NamedColumn();
+            try {
+                col.setAliasDotColumn(getColNameWithTable(d.names));
+                col.setName(getColNameWithTable(d.names).replace('.', '_'));
+            } catch (ParseException e) {
+                throw new UnsupportedQueryException(e.toString());
+            }
+            return col;
+        }).collect(Collectors.toList());
+        result.setSimplifiedDimensions(cols);
+    }
+
+    private void parseFromIdentifier(SqlIdentifier identifier, DDLParserResult result, String sql) throws Exception {
+        ImmutableList<String> names = identifier.names;
+        if (names.size() == 2 || names.size() == 4) {
+            // use `extractSubStringIgnoreSensitive` because project model are case-sensitive.
+            result.setProjectName(StringUtil.extractSubStringIgnoreSensitive(sql, names.get(0)));
+            result.setModelName(StringUtil.extractSubStringIgnoreSensitive(sql, names.get(1)));
+            if (names.size() == 4) {
+                result.setPartitionColName(names.get(2) + '.' + names.get(3));
+            }
+        } else {
+            throw new ParseException(
+                    "Identifier should contains project_name, model_name, partition_col_name(optional):" + names);
+        }
+    }
+
+    private JoinDesc getJoinDesc(SqlJoin join) throws ParseException {
+        JoinDesc res = new JoinDesc();
+        res.setType(join.getJoinType().toString().toUpperCase());
+        List<String> pKeys = Lists.newArrayList();
+        List<String> fKeys = Lists.newArrayList();
+        //Just get the outer condition
+        SqlBasicCall condition = (SqlBasicCall) join.getCondition();
+        List<SqlNode> operands = Arrays.stream(condition.getOperands()).collect(Collectors.toList());
+        for (int i = 0; i < operands.size(); i++) {
+            SqlNode operand = operands.get(i);
+            if (operand instanceof SqlBasicCall) {
+                SqlBasicCall call = (SqlBasicCall) operand;
+                operands.addAll(call.getOperandList());
+            } else if (operand instanceof SqlIdentifier) {
+                SqlIdentifier id = (SqlIdentifier) operand;
+                String colNameWithTable = getColNameWithTable(id.names);
+                // col should be alternative
+                if (pKeys.size() == fKeys.size()) {
+                    fKeys.add(colNameWithTable);
+                } else {
+                    pKeys.add(colNameWithTable);
+                }
+            }
+        }
+        res.setPrimaryKey(pKeys.toArray(new String[0]));
+        res.setForeignKey(fKeys.toArray(new String[0]));
+        return res;
+    }
+
+    private String getColNameWithTable(ImmutableList<String> names) throws ParseException {
+        if (names.size() == 2) {
+            return names.get(0) + '.' + names.get(1);
+        } else {
+            throw new ParseException("colName must be table_name.col_name, got:" + names);
+        }
+    }
+
+    private String getTableFullName(ImmutableList<String> names) throws ParseException {
+        if (names.size() == 2) {
+            return names.get(0) + '.' + names.get(1);
+        } else if (names.size() == 1) {
+            return "DEFAULT" + '.' + names.get(0);
+        } else {
+            throw new ParseException("tableName must be db_name.table_name, got:" + names);
+        }
+    }
+
+    // default set to 14
+    public static final String HLL_COUNT_TYPE = "hllc(14)";
+    public static final String HLL_COUNT_TYPE_10 = "hllc(10)";
+    public static final String HLL_COUNT_TYPE_12 = "hllc(12)";
+    public static final String HLL_COUNT_TYPE_14 = "hllc(14)";
+    public static final String HLL_COUNT_TYPE_15 = "hllc(15)";
+    public static final String HLL_COUNT_TYPE_16 = "hllc(16)";
+    public static final String BITMAP_COUNT_TYPE = "bitmap";
+    // default set to 100
+    public static final String PERCENTILE_TYPE = "percentile(100)";
+    public static final String PERCENTILE_TYPE_100 = "percentile(100)";
+    public static final String PERCENTILE_TYPE_1000 = "percentile(1000)";
+    public static final String PERCENTILE_TYPE_10000 = "percentile(10000)";
+    private static final String COUNT_DISTINCT_EXPR = "COUNT_DISTINCT";
+    private static final String PERCENTILE_EXPR = "PERCENTILE_APPROX";
+
+    // min, max, sum need set to `UNDEFINED`, then check return type in kylin
+    public static final String UNDEFINED_TYPE = "UNDEFINED";
+
+    private String getMeasureTypeInner(String measureName) {
+        switch (measureName) {
+        case "COUNT":
+            return "bigint";
+
+        case FUNC_PERCENTILE:
+        case FUNC_PERCENTILE_APPROX:
+            return PERCENTILE_TYPE;
+
+        case FUNC_HLL_COUNT:
+            return HLL_COUNT_TYPE;
+
+        case FUNC_BITMAP_COUNT:
+            return BITMAP_COUNT_TYPE;
+        // Support diff precise hll
+        case FUNC_HLL_COUNT_10:
+            return HLL_COUNT_TYPE_10;
+        case FUNC_HLL_COUNT_12:
+            return HLL_COUNT_TYPE_12;
+        case FUNC_HLL_COUNT_14:
+            return HLL_COUNT_TYPE_14;
+        case FUNC_HLL_COUNT_15:
+            return HLL_COUNT_TYPE_15;
+        case FUNC_HLL_COUNT_16:
+            return HLL_COUNT_TYPE_16;
+        // Support diff precise percentile
+        case FUNC_PERCENTILE_100:
+            return PERCENTILE_TYPE_100;
+        case FUNC_PERCENTILE_1000:
+            return PERCENTILE_TYPE_1000;
+        case FUNC_PERCENTILE_10000:
+            return PERCENTILE_TYPE_10000;
+        default:
+            return UNDEFINED_TYPE;
+        }
+    }
+
+    private String getMeasureExprInner(String measureName) {
+        switch (measureName) {
+        case FUNC_PERCENTILE:
+        case FUNC_PERCENTILE_APPROX:
+        case FUNC_PERCENTILE_100:
+        case FUNC_PERCENTILE_1000:
+        case FUNC_PERCENTILE_10000:
+            return PERCENTILE_EXPR;
+
+        case FUNC_HLL_COUNT:
+        case FUNC_BITMAP_COUNT:
+        case FUNC_HLL_COUNT_10:
+        case FUNC_HLL_COUNT_12:
+        case FUNC_HLL_COUNT_14:
+        case FUNC_HLL_COUNT_15:
+        case FUNC_HLL_COUNT_16:
+            return COUNT_DISTINCT_EXPR;
+
+        default:
+            return measureName.toUpperCase();
+        }
+    }
+
+    @Getter
+    @Setter
+    @ToString
+    public static class DDLParserResult {
+        String modelName;
+        String ProjectName;
+        String partitionColName;
+        //just col_name
+        List<NDataModel.NamedColumn> simplifiedDimensions;
+        // see InnerMeasure
+        List<InnerMeasure> simplifiedMeasures;
+        String factTable;
+
+        // also means lookup tables
+        List<JoinTableDesc> joinTables;
+    }
+
+    @Getter
+    @Setter
+    @ToString
+    public static class InnerMeasure {
+        // MIN, MAX, HLL(10) etc
+        String expression;
+        // Measure return type like bigInt, Integer, bitmap, hll(10)
+        String returnType;
+        // ("column", "db_name.table_name.col_name")
+        List<Pair<String, String>> parameterValues;
+    }
+}
diff --git a/src/query/src/test/java/org/apache/kylin/query/util/DDLParserTest.java b/src/query/src/test/java/org/apache/kylin/query/util/DDLParserTest.java
new file mode 100644
index 00000000000..22d7060914b
--- /dev/null
+++ b/src/query/src/test/java/org/apache/kylin/query/util/DDLParserTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+
+import org.apache.calcite.sql.parser.ddl.ParseException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.query.engine.KECalciteConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DDLParserTest {
+    @Test
+    public void test_multi_join() throws Exception {
+
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        String sql1 = "CREATE MATERIALIZED VIEW project.test_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3),\n" + "       hll_count(table1.c4),\n"
+                + "       bitmap_count(table1.c5)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "  ON table1.c1_1 = table2.c2_1 and table1.c1_3 = table2.c2_3\n" + "  JOIN db.table3\n"
+                + " ON table1.c1_2 = table2.c2_2\n" + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        DDLParser.DDLParserResult result = ddlParser.parseSQL(sql1);
+
+        Assert.assertEquals("project", result.getProjectName());
+        Assert.assertEquals("test_model", result.getModelName());
+        Assert.assertEquals("table1.c1_1".toUpperCase(), result.getPartitionColName());
+
+        List<NDataModel.NamedColumn> simplifiedDimensions = result.getSimplifiedDimensions();
+        Assert.assertEquals(2, simplifiedDimensions.size());
+        //Not use id info
+        Assert.assertEquals(
+                "[NDataModel.NamedColumn(id=0, name=TABLE1_C1_1, aliasDotColumn=TABLE1.C1_1, status=EXIST), NDataModel.NamedColumn(id=0, name=TABLE2_C2_2, aliasDotColumn=TABLE2.C2_2, status=EXIST)]",
+                simplifiedDimensions.toString());
+
+        List<DDLParser.InnerMeasure> simplifiedMeasures = result.getSimplifiedMeasures();
+        Assert.assertEquals(3, simplifiedMeasures.size());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=MAX, returnType=UNDEFINED, parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(0).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(14), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(1).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=bitmap, parameterValues=[{column,TABLE1.C5}])",
+                simplifiedMeasures.get(2).toString());
+
+        Assert.assertEquals("db.table1".toUpperCase(), result.getFactTable());
+
+        List<JoinTableDesc> joinTables = result.getJoinTables();
+        Assert.assertEquals(2, joinTables.size());
+        Assert.assertEquals(
+                "JoinTableDesc(table=DB.TABLE3, kind=LOOKUP, alias=TABLE3, join=JoinDesc [type=INNER, primary_key=[TABLE2.C2_2], foreign_key=[TABLE1.C1_2]], flattenable=null, joinRelationTypeEnum=MANY_TO_ONE, tableRef=null)",
+                joinTables.get(0).toString());
+        Assert.assertEquals(
+                "JoinTableDesc(table=DB.TABLE2, kind=LOOKUP, alias=TABLE2, join=JoinDesc [type=INNER, primary_key=[TABLE2.C2_1, TABLE2.C2_3], foreign_key=[TABLE1.C1_1, TABLE1.C1_3]], flattenable=null, joinRelationTypeEnum=MANY_TO_ONE, tableRef=null)",
+                joinTables.get(1).toString());
+
+    }
+
+    @Test
+    public void test_without_join() throws Exception {
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        String sql1 = "CREATE MATERIALIZED VIEW project.test_model AS\n" + "SELECT table1.c1,\n" + "       table1.c2,\n"
+                + "       percentile(table1.c3)\n" + "FROM \n" + "  db.table1 \n"
+                + "GROUP BY CUBE(table1.c1, table1.c2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        DDLParser.DDLParserResult result = ddlParser.parseSQL(sql1);
+
+        Assert.assertEquals("project", result.getProjectName());
+        Assert.assertEquals("test_model", result.getModelName());
+        Assert.assertNull(result.getPartitionColName());
+
+        List<NDataModel.NamedColumn> simplifiedDimensions = result.getSimplifiedDimensions();
+        Assert.assertEquals(2, simplifiedDimensions.size());
+        Assert.assertEquals(
+                "[NDataModel.NamedColumn(id=0, name=TABLE1_C1, aliasDotColumn=TABLE1.C1, status=EXIST), NDataModel.NamedColumn(id=0, name=TABLE1_C2, aliasDotColumn=TABLE1.C2, status=EXIST)]",
+                simplifiedDimensions.toString());
+
+        List<DDLParser.InnerMeasure> simplifiedMeasures = result.getSimplifiedMeasures();
+        Assert.assertEquals(1, simplifiedMeasures.size());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=PERCENTILE_APPROX, returnType=percentile(100), parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(0).toString());
+        Assert.assertEquals(result.getFactTable(), "db.table1".toUpperCase());
+
+        List<JoinTableDesc> joinTables = result.getJoinTables();
+        Assert.assertEquals(0, joinTables.size());
+
+    }
+
+    @Test
+    public void test_one_join() throws Exception {
+
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        String sql1 = "CREATE MATERIALIZED VIEW project.test_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "  ON table1.c1_1 = table2.c2_1 and table1.c1_3 = table2.c2_3\n"
+                + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        DDLParser.DDLParserResult result = ddlParser.parseSQL(sql1);
+
+        Assert.assertEquals("project", result.getProjectName());
+        Assert.assertEquals("test_model", result.getModelName());
+        Assert.assertEquals("table1.c1_1".toUpperCase(), result.getPartitionColName());
+
+        List<NDataModel.NamedColumn> simplifiedDimensions = result.getSimplifiedDimensions();
+        Assert.assertEquals(simplifiedDimensions.size(), 2);
+        //Not use id info
+        Assert.assertEquals(
+                "[NDataModel.NamedColumn(id=0, name=TABLE1_C1_1, aliasDotColumn=TABLE1.C1_1, status=EXIST), NDataModel.NamedColumn(id=0, name=TABLE2_C2_2, aliasDotColumn=TABLE2.C2_2, status=EXIST)]",
+                simplifiedDimensions.toString());
+
+        List<DDLParser.InnerMeasure> simplifiedMeasures = result.getSimplifiedMeasures();
+        Assert.assertEquals(1, simplifiedMeasures.size());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=MAX, returnType=UNDEFINED, parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(0).toString());
+
+        Assert.assertEquals("db.table1".toUpperCase(), result.getFactTable());
+
+        List<JoinTableDesc> joinTables = result.getJoinTables();
+        Assert.assertEquals(1, joinTables.size());
+        Assert.assertEquals(
+                "JoinTableDesc(table=DB.TABLE2, kind=LOOKUP, alias=TABLE2, join=JoinDesc [type=INNER, primary_key=[TABLE2.C2_1, TABLE2.C2_3], foreign_key=[TABLE1.C1_1, TABLE1.C1_3]], flattenable=null, joinRelationTypeEnum=MANY_TO_ONE, tableRef=null)",
+                joinTables.get(0).toString());
+    }
+
+    @Test
+    public void model_project_name_case() throws Exception {
+
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        String sql1 = "CREATE MATERIALIZED VIEW proJect_Name.tesT_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "  ON table1.c1_1 = table2.c2_1 and table1.c1_3 = table2.c2_3\n"
+                + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        DDLParser.DDLParserResult result = ddlParser.parseSQL(sql1);
+
+        Assert.assertEquals("proJect_Name", result.getProjectName());
+        Assert.assertEquals("tesT_model", result.getModelName());
+        Assert.assertEquals("table1.c1_1".toUpperCase(), result.getPartitionColName());
+    }
+
+    @Test
+    public void test_forbidden_join_condition() throws Exception {
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        // 1. no equal join
+        String sql1 = "CREATE MATERIALIZED VIEW proJect_Name.tesT_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "  ON table1.c1_1 = table2.c2_1 and table1.c1_3 > table2.c2_3\n"
+                + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        Assert.assertThrows(ParseException.class, () -> ddlParser.parseSQL(sql1));
+
+        // 2. no equal join
+        String sql2 = "CREATE MATERIALIZED VIEW proJect_Name.tesT_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "  ON table1.c1_1 > table2.c2_1 \n" + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+
+        Assert.assertThrows(ParseException.class, () -> ddlParser.parseSQL(sql2));
+
+        // 3. no join condition
+        String sql3 = "CREATE MATERIALIZED VIEW proJect_Name.tesT_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 JOIN db.table2\n"
+                + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+        Assert.assertThrows(ParseException.class, () -> ddlParser.parseSQL(sql3));
+
+        // 3. right join
+        String sql4 = "CREATE MATERIALIZED VIEW proJect_Name.tesT_model.table1.c1_1 AS\n" + "SELECT table1.c1_1,\n"
+                + "       table2.c2_2,\n" + "       MAX(table1.c3)\n" + "FROM \n" + "  db.table1 RIGHT JOIN db.table2\n"
+                + "  ON table1.c1_1 = table2.c2_1 and table1.c1_3 = table2.c2_3\n"
+                + "GROUP BY CUBE(table1.c1_1, table2.c2_2)";
+        Assert.assertThrows(ParseException.class, () -> ddlParser.parseSQL(sql4));
+    }
+
+    @Test
+    public void test_measure_diff_accuracy_type() throws Exception {
+        KylinConfig kylinConfig = mock(KylinConfig.class);
+        KECalciteConfig config = KECalciteConfig.fromKapConfig(kylinConfig);
+
+        String sql1 = "CREATE MATERIALIZED VIEW project.test_model AS\n" + "SELECT table1.c1,\n" + "       table1.c2,\n"
+                + "       percentile_100(table1.c3),\n" + "       percentile_1000(table1.c3),\n"
+                + "       percentile_10000(table1.c3),\n" + "       hll_count_10(table1.c4),\n"
+                + "       hll_count_12(table1.c4),\n" + "       hll_count_14(table1.c4),\n"
+                + "       hll_count_15(table1.c4),\n" + "       hll_count_16(table1.c4)\n" + "FROM \n"
+                + "  db.table1 \n" + "GROUP BY CUBE(table1.c1, table1.c2)";
+
+        DDLParser ddlParser = DDLParser.CreateParser(config);
+        DDLParser.DDLParserResult result = ddlParser.parseSQL(sql1);
+
+        List<NDataModel.NamedColumn> simplifiedDimensions = result.getSimplifiedDimensions();
+        Assert.assertEquals(2, simplifiedDimensions.size());
+        Assert.assertEquals(
+                "[NDataModel.NamedColumn(id=0, name=TABLE1_C1, aliasDotColumn=TABLE1.C1, status=EXIST), NDataModel.NamedColumn(id=0, name=TABLE1_C2, aliasDotColumn=TABLE1.C2, status=EXIST)]",
+                simplifiedDimensions.toString());
+
+        List<DDLParser.InnerMeasure> simplifiedMeasures = result.getSimplifiedMeasures();
+        Assert.assertEquals(8, simplifiedMeasures.size());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=PERCENTILE_APPROX, returnType=percentile(100), parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(0).toString());
+        Assert.assertEquals("db.table1".toUpperCase(), result.getFactTable());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=PERCENTILE_APPROX, returnType=percentile(1000), parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(1).toString());
+        Assert.assertEquals("db.table1".toUpperCase(), result.getFactTable());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=PERCENTILE_APPROX, returnType=percentile(10000), parameterValues=[{column,TABLE1.C3}])",
+                simplifiedMeasures.get(2).toString());
+        Assert.assertEquals("db.table1".toUpperCase(), result.getFactTable());
+
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(10), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(3).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(12), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(4).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(14), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(5).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(15), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(6).toString());
+        Assert.assertEquals(
+                "DDLParser.InnerMeasure(expression=COUNT_DISTINCT, returnType=hllc(16), parameterValues=[{column,TABLE1.C4}])",
+                simplifiedMeasures.get(7).toString());
+
+    }
+
+}