diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 47214c8c271af6..e4cedc2c3b9ae4 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -58,13 +58,11 @@ statementBase | supportedUnsetStatement #supportedUnsetStatementAlias | supportedRefreshStatement #supportedRefreshStatementAlias | supportedShowStatement #supportedShowStatementAlias + | supportedLoadStatement #supportedLoadStatementAlias | supportedRecoverStatement #supportedRecoverStatementAlias - | supportedLoadStatement #supportedLoadfStatementAlias | unsupportedStatement #unsupported ; - - unsupportedStatement : unsupportedUseStatement | unsupportedDmlStatement @@ -232,11 +230,12 @@ supportedShowStatement | SHOW TABLE tableId=INTEGER_VALUE #showTableId | SHOW WHITELIST #showWhitelist | SHOW TABLETS BELONG - tabletIds+=INTEGER_VALUE (COMMA tabletIds+=INTEGER_VALUE)* #showTabletsBelong + tabletIds+=INTEGER_VALUE (COMMA tabletIds+=INTEGER_VALUE)* #showTabletsBelong ; supportedLoadStatement : SYNC #sync + | createRoutineLoad #createRoutineLoadAlias ; unsupportedOtherStatement @@ -351,6 +350,14 @@ unsupportedShowStatement | SHOW WARM UP JOB wildWhere? #showWarmUpJob ; +createRoutineLoad + : CREATE ROUTINE LOAD label=multipartIdentifier (ON table=identifier)? + (WITH (APPEND | DELETE | MERGE))? + (loadProperty (COMMA loadProperty)*)? propertyClause? FROM type=identifier + LEFT_PAREN customProperties=propertyItemList RIGHT_PAREN + commentSpec? + ; + unsupportedLoadStatement : LOAD mysqlDataDesc (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? @@ -362,11 +369,6 @@ unsupportedLoadStatement | STOP SYNC JOB name=multipartIdentifier #stopDataSyncJob | RESUME SYNC JOB name=multipartIdentifier #resumeDataSyncJob | PAUSE SYNC JOB name=multipartIdentifier #pauseDataSyncJob - | CREATE ROUTINE LOAD label=multipartIdentifier (ON table=identifier)? - (WITH (APPEND | DELETE | MERGE))? - (loadProperty (COMMA loadProperty)*)? propertyClause? FROM type=identifier - LEFT_PAREN customProperties=propertyItemList RIGHT_PAREN - commentSpec? #createRoutineLoadJob | PAUSE ROUTINE LOAD FOR label=multipartIdentifier #pauseRoutineLoad | PAUSE ALL ROUTINE LOAD #pauseAllRoutineLoad | RESUME ROUTINE LOAD FOR label=multipartIdentifier #resumeRoutineLoad @@ -376,11 +378,6 @@ unsupportedLoadStatement | SHOW ROUTINE LOAD TASK ((FROM | IN) database=identifier)? wildWhere? #showRoutineLoadTask | SHOW ALL? CREATE ROUTINE LOAD FOR label=multipartIdentifier #showCreateRoutineLoad | SHOW CREATE LOAD FOR label=multipartIdentifier #showCreateLoad - | importSequenceStatement #importSequenceStatementAlias - | importPrecedingFilterStatement #importPrecedingFilterStatementAlias - | importWhereStatement #importWhereStatementAlias - | importDeleteOnStatement #importDeleteOnStatementAlias - | importColumnsStatement #importColumnsStatementAlias ; loadProperty diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index a16259dcdb98d2..27379ccd28d098 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -34,6 +34,7 @@ import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -229,6 +230,51 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List loadPropertyList, OriginStatement origStmt, UserIdentity userIdentity, + Map jobProperties, String typeName, RoutineLoadDesc routineLoadDesc, + int desireTaskConcurrentNum, long maxErrorNum, double maxFilterRatio, long maxBatchIntervalS, + long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int sendBatchParallelism, String timezone, + String format, String jsonPaths, String jsonRoot, byte enclose, byte escape, long workloadGroupId, + boolean loadToSingleTablet, boolean strictMode, boolean isPartialUpdate, boolean stripOuterArray, + boolean numAsString, boolean fuzzyParse, AbstractDataSourceProperties dataSourceProperties) { + this.labelName = labelName; + this.dbName = dbName; + this.name = name; + this.tableName = tableName; + this.loadPropertyList = loadPropertyList; + this.setOrigStmt(origStmt); + this.setUserInfo(userIdentity); + this.jobProperties = jobProperties; + this.typeName = typeName; + this.routineLoadDesc = routineLoadDesc; + this.desiredConcurrentNum = desireTaskConcurrentNum; + this.maxErrorNum = maxErrorNum; + this.maxFilterRatio = maxFilterRatio; + this.maxBatchIntervalS = maxBatchIntervalS; + this.maxBatchRows = maxBatchRows; + this.maxBatchSizeBytes = maxBatchSizeBytes; + this.execMemLimit = execMemLimit; + this.sendBatchParallelism = sendBatchParallelism; + this.timezone = timezone; + this.format = format; + this.jsonPaths = jsonPaths; + this.jsonRoot = jsonRoot; + this.enclose = enclose; + this.escape = escape; + this.workloadGroupId = workloadGroupId; + this.loadToSingleTablet = loadToSingleTablet; + this.strictMode = strictMode; + this.isPartialUpdate = isPartialUpdate; + this.stripOuterArray = stripOuterArray; + this.numAsString = numAsString; + this.fuzzyParse = fuzzyParse; + this.dataSourceProperties = dataSourceProperties; + } + public String getName() { return name; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java index 3a5731944a6b95..86027ea9ceac5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java @@ -34,6 +34,11 @@ public Separator(String separator) { this.separator = null; } + public Separator(String separator, String oriSeparator) { + this.oriSeparator = oriSeparator; + this.separator = separator; + } + public String getOriSeparator() { return oriSeparator; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 5513b4f5ecbebd..b4fb4488925ed1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -84,6 +84,7 @@ import org.apache.doris.nereids.DorisParser.ConstantContext; import org.apache.doris.nereids.DorisParser.CreateMTMVContext; import org.apache.doris.nereids.DorisParser.CreateProcedureContext; +import org.apache.doris.nereids.DorisParser.CreateRoutineLoadContext; import org.apache.doris.nereids.DorisParser.CreateRowPolicyContext; import org.apache.doris.nereids.DorisParser.CreateTableContext; import org.apache.doris.nereids.DorisParser.CreateTableLikeContext; @@ -120,6 +121,12 @@ import org.apache.doris.nereids.DorisParser.IdentifierContext; import org.apache.doris.nereids.DorisParser.IdentifierListContext; import org.apache.doris.nereids.DorisParser.IdentifierSeqContext; +import org.apache.doris.nereids.DorisParser.ImportColumnsContext; +import org.apache.doris.nereids.DorisParser.ImportDeleteOnContext; +import org.apache.doris.nereids.DorisParser.ImportPartitionsContext; +import org.apache.doris.nereids.DorisParser.ImportPrecedingFilterContext; +import org.apache.doris.nereids.DorisParser.ImportSequenceContext; +import org.apache.doris.nereids.DorisParser.ImportWhereContext; import org.apache.doris.nereids.DorisParser.InPartitionDefContext; import org.apache.doris.nereids.DorisParser.IndexDefContext; import org.apache.doris.nereids.DorisParser.IndexDefsContext; @@ -135,6 +142,7 @@ import org.apache.doris.nereids.DorisParser.LateralViewContext; import org.apache.doris.nereids.DorisParser.LessThanPartitionDefContext; import org.apache.doris.nereids.DorisParser.LimitClauseContext; +import org.apache.doris.nereids.DorisParser.LoadPropertyContext; import org.apache.doris.nereids.DorisParser.LogicalBinaryContext; import org.apache.doris.nereids.DorisParser.LogicalNotContext; import org.apache.doris.nereids.DorisParser.MapLiteralContext; @@ -187,6 +195,7 @@ import org.apache.doris.nereids.DorisParser.SelectClauseContext; import org.apache.doris.nereids.DorisParser.SelectColumnClauseContext; import org.apache.doris.nereids.DorisParser.SelectHintContext; +import org.apache.doris.nereids.DorisParser.SeparatorContext; import org.apache.doris.nereids.DorisParser.SetCharsetContext; import org.apache.doris.nereids.DorisParser.SetCollateContext; import org.apache.doris.nereids.DorisParser.SetDefaultStorageVaultContext; @@ -517,6 +526,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateViewInfo; @@ -529,6 +539,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.GeneratedColumnDesc; import org.apache.doris.nereids.trees.plans.commands.info.InPartition; import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo; import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition; import org.apache.doris.nereids.trees.plans.commands.info.MTMVPartitionDefinition; import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition; @@ -553,6 +564,15 @@ import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc; +import org.apache.doris.nereids.trees.plans.commands.load.LoadDeleteOnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause; import org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; @@ -1436,6 +1456,108 @@ public LogicalSubQueryAlias visitAliasQuery(AliasQueryContext ctx) { }); } + /** + * process LoadProperty in routine load + */ + public LoadProperty visitLoadProperty(LoadPropertyContext ctx) { + LoadProperty loadProperty = null; + if (ctx instanceof SeparatorContext) { + String separator = stripQuotes(((SeparatorContext) ctx).STRING_LITERAL().getText()); + loadProperty = new LoadSeparator(separator); + } else if (ctx instanceof ImportColumnsContext) { + List descList = new ArrayList<>(); + for (DorisParser.ImportColumnDescContext loadColumnDescCtx : ((ImportColumnsContext) ctx) + .importColumnsStatement().importColumnDesc()) { + LoadColumnDesc desc; + if (loadColumnDescCtx.booleanExpression() != null) { + desc = new LoadColumnDesc(loadColumnDescCtx.name.getText(), + getExpression(loadColumnDescCtx.booleanExpression())); + } else { + desc = new LoadColumnDesc(loadColumnDescCtx.name.getText()); + } + descList.add(desc); + } + loadProperty = new LoadColumnClause(descList); + } else if (ctx instanceof ImportDeleteOnContext) { + loadProperty = new LoadDeleteOnClause(getExpression(((ImportDeleteOnContext) ctx) + .importDeleteOnStatement().booleanExpression())); + } else if (ctx instanceof ImportPartitionsContext) { + Pair> partitionSpec = visitPartitionSpec( + ((ImportPartitionsContext) ctx).partitionSpec()); + loadProperty = new LoadPartitionNames(partitionSpec.first, partitionSpec.second); + } else if (ctx instanceof ImportPrecedingFilterContext) { + loadProperty = new LoadWhereClause(getExpression(((ImportPrecedingFilterContext) ctx) + .importPrecedingFilterStatement().booleanExpression()), true); + } else if (ctx instanceof ImportSequenceContext) { + loadProperty = new LoadSequenceClause(((ImportSequenceContext) ctx) + .importSequenceStatement().identifier().getText()); + } else if (ctx instanceof ImportWhereContext) { + loadProperty = new LoadWhereClause(getExpression(((ImportWhereContext) ctx) + .importWhereStatement().booleanExpression()), false); + } + return loadProperty; + } + + @Override + public LogicalPlan visitCreateRoutineLoad(CreateRoutineLoadContext ctx) { + List labelParts = visitMultipartIdentifier(ctx.label); + String labelName = null; + String labelDbName = null; + if (ConnectContext.get().getDatabase().isEmpty() && labelParts.size() == 1) { + throw new AnalysisException("Current database is not set."); + } else if (labelParts.size() == 1) { + labelName = labelParts.get(0); + } else if (labelParts.size() == 2) { + labelDbName = labelParts.get(0); + labelName = labelParts.get(1); + } else if (labelParts.size() == 3) { + labelDbName = labelParts.get(1); + labelName = labelParts.get(2); + } else { + throw new AnalysisException("labelParts in load should be [ctl.][db.]label"); + } + LabelNameInfo jobLabelInfo = new LabelNameInfo(labelDbName, labelName); + String tableName = null; + if (ctx.table != null) { + tableName = ctx.table.getText(); + } + Map properties = ctx.propertyClause() != null + // NOTICE: we should not generate immutable map here, because it will be modified when analyzing. + ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) + : Maps.newHashMap(); + String type = ctx.type.getText(); + Map customProperties = ctx.customProperties != null + // NOTICE: we should not generate immutable map here, because it will be modified when analyzing. + ? Maps.newHashMap(visitPropertyItemList(ctx.customProperties)) + : Maps.newHashMap(); + LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; + if (ctx.WITH() != null) { + if (ctx.DELETE() != null) { + mergeType = LoadTask.MergeType.DELETE; + } else if (ctx.MERGE() != null) { + mergeType = LoadTask.MergeType.MERGE; + } + } + String comment = visitCommentSpec(ctx.commentSpec()); + Map loadPropertyMap = new HashMap<>(); + for (DorisParser.LoadPropertyContext oneLoadPropertyCOntext : ctx.loadProperty()) { + LoadProperty loadProperty = visitLoadProperty(oneLoadPropertyCOntext); + if (loadProperty == null) { + throw new AnalysisException("invalid clause of routine load"); + } + if (loadPropertyMap.get(loadProperty.getClass().getName()) != null) { + throw new AnalysisException("repeat setting of clause load property: " + + loadProperty.getClass().getName()); + } else { + loadPropertyMap.put(loadProperty.getClass().getName(), loadProperty); + } + } + CreateRoutineLoadInfo createRoutineLoadInfo = new CreateRoutineLoadInfo(jobLabelInfo, tableName, + loadPropertyMap, properties, type, customProperties, mergeType, comment); + return new CreateRoutineLoadCommand(createRoutineLoadInfo); + + } + @Override public Command visitCreateRowPolicy(CreateRowPolicyContext ctx) { FilterType filterType = FilterType.of(ctx.type.getText()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 4e62a7e7269be9..57e330d90a3e06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -215,5 +215,6 @@ public enum PlanType { RECOVER_DATABASE_COMMAND, RECOVER_TABLE_COMMAND, RECOVER_PARTITION_COMMAND, - REPLAY_COMMAND + REPLAY_COMMAND, + CREATE_ROUTINE_LOAD_COMMAND } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java new file mode 100644 index 00000000000000..3e0fd0d34fb091 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.ImportColumnsStmt; +import org.apache.doris.analysis.ImportDeleteOnStmt; +import org.apache.doris.analysis.ImportSequenceStmt; +import org.apache.doris.analysis.ImportWhereStmt; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.Scope; +import org.apache.doris.nereids.analyzer.UnboundRelation; +import org.apache.doris.nereids.glue.translator.ExpressionTranslator; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.analysis.BindRelation; +import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OlapScan; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc; +import org.apache.doris.nereids.trees.plans.commands.load.LoadDeleteOnClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames; +import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator; +import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause; +import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * info in creating routine load. + */ +public class CreateRoutineLoadInfo { + // routine load properties + public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; + public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = "current_concurrent_number"; + // max error number in ten thousand records + public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; + public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; + // the following 3 properties limit the time and batch size of a single routine load task + public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY = "max_batch_interval"; + public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows"; + public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size"; + public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit"; + + public static final String FORMAT = "format"; // the value is csv or json, default is csv + public static final String STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String JSONPATHS = "jsonpaths"; + public static final String JSONROOT = "json_root"; + public static final String NUM_AS_STRING = "num_as_string"; + public static final String FUZZY_PARSE = "fuzzy_parse"; + public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String WORKLOAD_GROUP = "workload_group"; + public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; + public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; + public static final java.util.function.Predicate DESIRED_CONCURRENT_NUMBER_PRED = (v) -> v > 0L; + public static final java.util.function.Predicate MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L; + public static final java.util.function.Predicate MAX_FILTER_RATIO_PRED = (v) -> v >= 0 && v <= 1; + public static final java.util.function.Predicate MAX_BATCH_INTERVAL_PRED = (v) -> v >= 1; + public static final java.util.function.Predicate MAX_BATCH_ROWS_PRED = (v) -> v >= 200000; + public static final java.util.function.Predicate MAX_BATCH_SIZE_PRED = (v) -> v >= 100 * 1024 * 1024 + && v <= (long) (1024 * 1024 * 1024) * 10; + public static final java.util.function.Predicate EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L; + public static final Predicate SEND_BATCH_PARALLELISM_PRED = (v) -> v > 0L; + + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; + + private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() + .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) + .add(MAX_ERROR_NUMBER_PROPERTY) + .add(MAX_FILTER_RATIO_PROPERTY) + .add(MAX_BATCH_INTERVAL_SEC_PROPERTY) + .add(MAX_BATCH_ROWS_PROPERTY) + .add(MAX_BATCH_SIZE_PROPERTY) + .add(FORMAT) + .add(JSONPATHS) + .add(STRIP_OUTER_ARRAY) + .add(NUM_AS_STRING) + .add(FUZZY_PARSE) + .add(JSONROOT) + .add(LoadStmt.STRICT_MODE) + .add(LoadStmt.TIMEZONE) + .add(EXEC_MEM_LIMIT_PROPERTY) + .add(SEND_BATCH_PARALLELISM) + .add(LOAD_TO_SINGLE_TABLET) + .add(PARTIAL_COLUMNS) + .add(WORKLOAD_GROUP) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) + .build(); + + private final LabelNameInfo labelNameInfo; + private String tableName; + private final Map loadPropertyMap; + private final Map jobProperties; + private final String typeName; + + // the following variables will be initialized after analyze + // -1 as unset, the default value will set in RoutineLoadJob + private String name; + private String dbName; + private RoutineLoadDesc routineLoadDesc; + private int desiredConcurrentNum = 1; + private long maxErrorNum = -1; + private double maxFilterRatio = -1; + private long maxBatchIntervalS = -1; + private long maxBatchRows = -1; + private long maxBatchSizeBytes = -1; + private boolean strictMode = true; + private long execMemLimit = 2 * 1024 * 1024 * 1024L; + private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + private int sendBatchParallelism = 1; + private boolean loadToSingleTablet = false; + /** + * RoutineLoad support json data. + * Require Params: + * 1) dataFormat = "json" + * 2) jsonPaths = "$.XXX.xxx" + */ + private String format = ""; //default is csv. + private String jsonPaths = ""; + private String jsonRoot = ""; // MUST be a jsonpath string + private boolean stripOuterArray = false; + private boolean numAsString = false; + private boolean fuzzyParse = false; + + private byte enclose; + + private byte escape; + + private long workloadGroupId = -1; + + /** + * support partial columns load(Only Unique Key Columns) + */ + private boolean isPartialUpdate = false; + + private String comment = ""; + + private LoadTask.MergeType mergeType; + + private boolean isMultiTable = false; + + private AbstractDataSourceProperties dataSourceProperties; + + /** + * constructor for create table + */ + public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName, + Map loadPropertyMap, + Map jobProperties, String typeName, + Map dataSourceProperties, LoadTask.MergeType mergeType, + String comment) { + this.labelNameInfo = labelNameInfo; + if (StringUtils.isBlank(tableName)) { + this.isMultiTable = true; + } + this.tableName = tableName; + this.loadPropertyMap = loadPropertyMap; + this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties; + this.typeName = typeName.toUpperCase(); + this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory + .createDataSource(typeName, dataSourceProperties, this.isMultiTable); + this.mergeType = mergeType; + this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); + if (comment != null) { + this.comment = comment; + } + } + + /** + * analyze create table info + */ + public void validate(ConnectContext ctx) throws UserException { + // check dbName and tableName + checkDBTable(ctx); + // check name + try { + FeNameFormat.checkCommonName(NAME_TYPE, name); + } catch (org.apache.doris.common.AnalysisException e) { + // 64 is the length of regular expression matching + // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX) + throw new AnalysisException(e.getMessage() + + " Maybe routine load job name is longer than 64 or contains illegal characters"); + } + // check load properties include column separator etc. + checkLoadProperties(ctx); + // check routine load job properties include desired concurrent number etc. + checkJobProperties(); + // check data source properties + checkDataSourceProperties(); + // analyze merge type + if (routineLoadDesc != null) { + if (mergeType != LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() != null) { + throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); + } + if (mergeType == LoadTask.MergeType.MERGE && routineLoadDesc.getDeleteCondition() == null) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } else if (mergeType == LoadTask.MergeType.MERGE) { + throw new AnalysisException("Excepted DELETE ON clause when merge type is MERGE."); + } + } + + private void checkDBTable(ConnectContext ctx) throws AnalysisException { + labelNameInfo.validate(ctx); + dbName = labelNameInfo.getDb(); + name = labelNameInfo.getLabel(); + + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + if (isPartialUpdate && isMultiTable) { + throw new AnalysisException("Partial update is not supported in multi-table load."); + } + if (isMultiTable) { + return; + } + if (Strings.isNullOrEmpty(tableName)) { + throw new AnalysisException("Table name should not be null"); + } + Table table = db.getTableOrAnalysisException(tableName); + if (mergeType != LoadTask.MergeType.APPEND + && (table.getType() != Table.TableType.OLAP + || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS)) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } + if (mergeType != LoadTask.MergeType.APPEND + && !(table.getType() == Table.TableType.OLAP && ((OlapTable) table).hasDeleteSign())) { + throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); + } + if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); + } + } + + private void checkLoadProperties(ConnectContext ctx) throws UserException { + Separator columnSeparator = null; + // TODO(yangzhengguo01): add line delimiter to properties + Separator lineDelimiter = null; + ImportColumnsStmt importColumnsStmt = null; + ImportWhereStmt precedingImportWhereStmt = null; + ImportWhereStmt importWhereStmt = null; + ImportSequenceStmt importSequenceStmt = null; + PartitionNames partitionNames = null; + ImportDeleteOnStmt importDeleteOnStmt = null; + CascadesContext cascadesContext = null; + ExpressionAnalyzer analyzer = null; + PlanTranslatorContext context = null; + if (!isMultiTable) { + List nameParts = Lists.newArrayList(); + nameParts.add(dbName); + nameParts.add(tableName); + Plan unboundRelation = new UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts); + cascadesContext = CascadesContext.initContext(ctx.getStatementContext(), unboundRelation, + PhysicalProperties.ANY); + Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext, + ImmutableList.of(Rewriter.bottomUp(new BindRelation()))).execute(); + Plan boundRelation = cascadesContext.getRewritePlan(); + // table could have delete sign in LogicalFilter above + if (cascadesContext.getRewritePlan() instanceof LogicalFilter) { + boundRelation = (Plan) ((LogicalFilter) cascadesContext.getRewritePlan()).child(); + } + context = new PlanTranslatorContext(cascadesContext); + List slots = boundRelation.getOutput(); + Scope scope = new Scope(slots); + analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false); + + Map translateMap = Maps.newHashMap(); + + TupleDescriptor tupleDescriptor = context.generateTupleDesc(); + tupleDescriptor.setTable(((OlapScan) boundRelation).getTable()); + for (int i = 0; i < boundRelation.getOutput().size(); i++) { + SlotReference slotReference = (SlotReference) boundRelation.getOutput().get(i); + SlotRef slotRef = new SlotRef(null, slotReference.getName()); + translateMap.put(slotReference, slotRef); + context.createSlotDesc(tupleDescriptor, slotReference, ((OlapScan) boundRelation).getTable()); + } + } + + if (loadPropertyMap != null) { + for (LoadProperty loadProperty : loadPropertyMap.values()) { + loadProperty.validate(); + if (loadProperty instanceof LoadSeparator) { + String oriSeparator = ((LoadSeparator) loadProperty).getOriSeparator(); + String separator = Separator.convertSeparator(oriSeparator); + columnSeparator = new Separator(separator, oriSeparator); + } else if (loadProperty instanceof LoadColumnClause) { + if (isMultiTable) { + throw new AnalysisException("Multi-table load does not support setting columns info"); + } + List importColumnDescList = new ArrayList<>(); + for (LoadColumnDesc columnDesc : ((LoadColumnClause) loadProperty).getColumns()) { + if (columnDesc.getExpression() != null) { + Expr expr = translateToLegacyExpr(columnDesc.getExpression(), analyzer, + context, cascadesContext); + importColumnDescList.add(new ImportColumnDesc(columnDesc.getColumnName(), expr)); + } else { + importColumnDescList.add(new ImportColumnDesc(columnDesc.getColumnName(), null)); + } + } + importColumnsStmt = new ImportColumnsStmt(importColumnDescList); + } else if (loadProperty instanceof LoadWhereClause) { + if (isMultiTable) { + throw new AnalysisException("Multi-table load does not support setting columns info"); + } + Expr expr = translateToLegacyExpr(((LoadWhereClause) loadProperty).getExpression(), + analyzer, context, cascadesContext); + if (((LoadWhereClause) loadProperty).isPreceding()) { + precedingImportWhereStmt = new ImportWhereStmt(expr, + ((LoadWhereClause) loadProperty).isPreceding()); + } else { + importWhereStmt = new ImportWhereStmt(expr, ((LoadWhereClause) loadProperty).isPreceding()); + } + } else if (loadProperty instanceof LoadPartitionNames) { + partitionNames = new PartitionNames(((LoadPartitionNames) loadProperty).isTemp(), + ((LoadPartitionNames) loadProperty).getPartitionNames()); + } else if (loadProperty instanceof LoadDeleteOnClause) { + Expr expr = translateToLegacyExpr(((LoadDeleteOnClause) loadProperty).getExpression(), + analyzer, context, cascadesContext); + importDeleteOnStmt = new ImportDeleteOnStmt(expr); + } else if (loadProperty instanceof LoadSequenceClause) { + importSequenceStmt = new ImportSequenceStmt( + ((LoadSequenceClause) loadProperty).getSequenceColName()); + } + } + } + routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, importColumnsStmt, + precedingImportWhereStmt, importWhereStmt, + partitionNames, importDeleteOnStmt == null ? null : importDeleteOnStmt.getExpr(), mergeType, + importSequenceStmt == null ? null : importSequenceStmt.getSequenceColName()); + } + + private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer analyzer, PlanTranslatorContext context, + CascadesContext cascadesContext) { + Expression expression; + try { + expression = analyzer.analyze(expr, new ExpressionRewriteContext(cascadesContext)); + } catch (org.apache.doris.nereids.exceptions.AnalysisException e) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("In where clause '" + + expr.toSql() + "', " + + Utils.convertFirstChar(e.getMessage())); + } + return ExpressionTranslator.translate(expression, context); + } + + private void checkJobProperties() throws UserException { + Optional optional = jobProperties.keySet().stream().filter( + entity -> !PROPERTIES_SET.contains(entity)).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid property"); + } + + desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault( + jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY), + Config.max_routine_load_task_concurrent_num, DESIRED_CONCURRENT_NUMBER_PRED, + DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater than 0")).intValue(); + + maxErrorNum = Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED, + MAX_ERROR_NUMBER_PROPERTY + " should >= 0"); + + maxFilterRatio = Util.getDoublePropertyOrDefault(jobProperties.get(MAX_FILTER_RATIO_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_FILTER_RATIO, MAX_FILTER_RATIO_PRED, + MAX_FILTER_RATIO_PROPERTY + " should between 0 and 1"); + + maxBatchIntervalS = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, MAX_BATCH_INTERVAL_PRED, + MAX_BATCH_INTERVAL_SEC_PROPERTY + " should >= 1"); + + maxBatchRows = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED, + MAX_BATCH_ROWS_PROPERTY + " should > 200000"); + + maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED, + MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB"); + + strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE), + RoutineLoadJob.DEFAULT_STRICT_MODE, + LoadStmt.STRICT_MODE + " should be a boolean"); + execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY), + RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, + EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0"); + + sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), + ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED, + SEND_BATCH_PARALLELISM + " must be greater than 0")).intValue(); + loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET), + RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET, + LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean"); + + String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE); + if (encloseStr != null) { + if (encloseStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + enclose = encloseStr.getBytes()[0]; + } + } + String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE); + if (escapeStr != null) { + if (escapeStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + escape = escapeStr.getBytes()[0]; + } + } + + String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP); + if (!StringUtils.isEmpty(inputWorkloadGroupStr)) { + this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), inputWorkloadGroupStr); + } + + if (ConnectContext.get() != null) { + timezone = ConnectContext.get().getSessionVariable().getTimeZone(); + } + timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone)); + + format = jobProperties.get(FORMAT); + if (format != null) { + if (format.equalsIgnoreCase("csv")) { + format = ""; // if it's not json, then it's mean csv and set empty + } else if (format.equalsIgnoreCase("json")) { + format = "json"; + jsonPaths = jobProperties.getOrDefault(JSONPATHS, ""); + jsonRoot = jobProperties.getOrDefault(JSONROOT, ""); + stripOuterArray = Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); + numAsString = Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false")); + fuzzyParse = Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false")); + } else { + throw new UserException("Format type is invalid. format=`" + format + "`"); + } + } else { + format = "csv"; // default csv + } + } + + private void checkDataSourceProperties() throws UserException { + this.dataSourceProperties.setTimezone(this.timezone); + this.dataSourceProperties.analyze(); + } + + /** + * make legacy create routine load statement after validate by nereids + * @return legacy create routine load statement + */ + public CreateRoutineLoadStmt translateToLegacyStmt(ConnectContext ctx) { + return new CreateRoutineLoadStmt(labelNameInfo.transferToLabelName(), dbName, name, tableName, null, + ctx.getStatementContext().getOriginStatement(), ctx.getUserIdentity(), + jobProperties, typeName, routineLoadDesc, + desiredConcurrentNum, maxErrorNum, maxFilterRatio, maxBatchIntervalS, maxBatchRows, maxBatchSizeBytes, + execMemLimit, sendBatchParallelism, timezone, format, jsonPaths, jsonRoot, enclose, escape, workloadGroupId, + loadToSingleTablet, strictMode, isPartialUpdate, stripOuterArray, numAsString, fuzzyParse, + dataSourceProperties + ); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/LabelNameInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/LabelNameInfo.java new file mode 100644 index 00000000000000..c314b5bce0defe --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/LabelNameInfo.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/TableName.java +// and modified by Doris + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.LabelName; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +import java.util.Objects; + +/** + * Label name info + */ +public class LabelNameInfo { + private String label; + private String db; + + public LabelNameInfo() { + + } + + /** + * TableNameInfo + * @param db dbName + * @param label tblName + */ + public LabelNameInfo(String db, String label) { + Objects.requireNonNull(label, "require label object"); + this.label = label; + if (Env.isStoredTableNamesLowerCase()) { + this.label = label.toLowerCase(); + } + this.db = db; + } + + /** + * validate labelNameInfo + * @param ctx ctx + */ + public void validate(ConnectContext ctx) throws org.apache.doris.common.AnalysisException { + if (Strings.isNullOrEmpty(db)) { + db = ctx.getDatabase(); + if (Strings.isNullOrEmpty(db)) { + throw new AnalysisException("No database selected"); + } + } + + if (Strings.isNullOrEmpty(label)) { + throw new AnalysisException("Table name is null"); + } + + FeNameFormat.checkLabel(label); + } + + /** + * get db name + * @return dbName + */ + public String getDb() { + return db; + } + + /** + * set a new database name + * @param db new database name + */ + public void setDb(String db) { + this.db = db; + } + + /** + * get label name + * @return labelName + */ + public String getLabel() { + return label; + } + + /** + * transferToLabelName + * @return LabelName + */ + public LabelName transferToLabelName() { + return new LabelName(db, label); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/CreateRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/CreateRoutineLoadCommand.java new file mode 100644 index 00000000000000..da7ab86c17abac --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/CreateRoutineLoadCommand.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.NoForward; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import java.util.Objects; + +/** + Create routine Load statement, continually load data from a streaming app + + syntax: + CREATE ROUTINE LOAD [database.]name on table + [load properties] + [PROPERTIES + ( + desired_concurrent_number = xxx, + max_error_number = xxx, + k1 = v1, + ... + kn = vn + )] + FROM type of routine load + [( + k1 = v1, + ... + kn = vn + )] + + load properties: + load property [[,] load property] ... + + load property: + column separator | columns_mapping | partitions | where + + column separator: + COLUMNS TERMINATED BY xxx + columns_mapping: + COLUMNS (c1, c2, c3 = c1 + c2) + partitions: + PARTITIONS (p1, p2, p3) + where: + WHERE c1 > 1 + + type of routine load: + KAFKA +*/ +public class CreateRoutineLoadCommand extends Command implements NoForward { + CreateRoutineLoadInfo createRoutineLoadInfo; + + public CreateRoutineLoadCommand(CreateRoutineLoadInfo createRoutineLoadInfo) { + super(PlanType.CREATE_ROUTINE_LOAD_COMMAND); + this.createRoutineLoadInfo = Objects.requireNonNull(createRoutineLoadInfo, "require CreateTableInfo object"); + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + createRoutineLoadInfo.validate(ctx); + CreateRoutineLoadStmt createRoutineLoadStmt = createRoutineLoadInfo.translateToLegacyStmt(ctx); + Env.getCurrentEnv().getRoutineLoadManager().createRoutineLoadJob(createRoutineLoadStmt); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCreateRoutineLoadCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnClause.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnClause.java new file mode 100644 index 00000000000000..a2bc7acbabe076 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnClause.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import java.util.List; + +/** + * load LoadColumnClause for nereids + */ +public class LoadColumnClause implements LoadProperty { + private List columns; + + public LoadColumnClause(List columns) { + this.columns = columns; + } + + public List getColumns() { + return columns; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnDesc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnDesc.java new file mode 100644 index 00000000000000..46b8013b159fa4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadColumnDesc.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.nereids.trees.expressions.Expression; + +/** + * load LoadColumnDesc for nereids + */ +public class LoadColumnDesc { + private final String columnName; + private Expression expression; + + public LoadColumnDesc(String column, Expression expression) { + this.columnName = column; + this.expression = expression; + } + + public LoadColumnDesc(String column) { + this.columnName = column; + } + + public String getColumnName() { + return columnName; + } + + public Expression getExpression() { + return expression; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadDeleteOnClause.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadDeleteOnClause.java new file mode 100644 index 00000000000000..4ab77f8e9bb300 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadDeleteOnClause.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.nereids.trees.expressions.Expression; + +/** + * load LoadDeleteOnClause for nereids + */ +public class LoadDeleteOnClause implements LoadProperty { + private final Expression expression; + + public LoadDeleteOnClause(Expression expression) { + this.expression = expression; + } + + public Expression getExpression() { + return expression; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadPartitionNames.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadPartitionNames.java new file mode 100644 index 00000000000000..77b06bc695c75b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadPartitionNames.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.common.AnalysisException; + +import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +/** + * load LoadPartitionNames for nereids + */ +public class LoadPartitionNames implements LoadProperty { + @SerializedName(value = "partitionNames") + private final List partitionNames; + // true if these partitions are temp partitions + @SerializedName(value = "isTemp") + private final boolean isTemp; + private final boolean isStar; + private final long count; + + public LoadPartitionNames(boolean isTemp, List partitionNames) { + this.isTemp = isTemp; + this.partitionNames = partitionNames; + this.isStar = false; + this.count = 0; + } + + public List getPartitionNames() { + return partitionNames; + } + + public boolean isTemp() { + return isTemp; + } + + @Override + public void validate() throws AnalysisException { + if (isStar && count > 0) { + throw new AnalysisException("All partition and partition count couldn't be set at the same time."); + } + if (isStar || count > 0) { + return; + } + if (partitionNames == null || partitionNames.isEmpty()) { + throw new AnalysisException("No partition specified in partition lists"); + } + // check if partition name is not empty string + if (partitionNames.stream().anyMatch(Strings::isNullOrEmpty)) { + throw new AnalysisException("there are empty partition name"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadProperty.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadProperty.java new file mode 100644 index 00000000000000..c21ba1c5774bbb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadProperty.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ParseNode.java +// and modified by Doris + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.common.AnalysisException; + +/** + * load LoadProperty for nereids + */ +public interface LoadProperty { + + /** + * Perform semantic validate of node and all of its children. + * Throws exception if any errors found. + */ + default void validate() throws AnalysisException { + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSeparator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSeparator.java new file mode 100644 index 00000000000000..a1b31cb721638a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSeparator.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +/** + * load seperator for nereids + */ +public class LoadSeparator implements LoadProperty { + private final String oriSeparator; + + public LoadSeparator(String oriSeparator) { + this.oriSeparator = oriSeparator; + } + + public String getOriSeparator() { + return oriSeparator; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSequenceClause.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSequenceClause.java new file mode 100644 index 00000000000000..e9545382fe7dc3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadSequenceClause.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +/** + * load LoadSequenceClause for nereids + */ +public class LoadSequenceClause implements LoadProperty { + private final String sequenceColName; + + public LoadSequenceClause(String sequenceColName) { + this.sequenceColName = sequenceColName; + } + + public String getSequenceColName() { + return sequenceColName; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadWhereClause.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadWhereClause.java new file mode 100644 index 00000000000000..627f03e89e5c9b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/load/LoadWhereClause.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.load; + +import org.apache.doris.nereids.trees.expressions.Expression; + +/** + * load LoadWhereClause for nereids + */ +public class LoadWhereClause implements LoadProperty { + private final Expression expression; + + private final boolean isPreceding; + + public LoadWhereClause(Expression expression, boolean isPreceding) { + this.expression = expression; + this.isPreceding = isPreceding; + } + + public Expression getExpression() { + return expression; + } + + public boolean isPreceding() { + return isPreceding; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 815c5c67030c34..f6cdf535fbf43b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -99,6 +99,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand; import org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand; /** CommandVisitor. */ @@ -438,4 +439,8 @@ default R visitShowPrivilegesCommand(ShowPrivilegesCommand showPrivilegesCommand default R visitShowTabletsBelongCommand(ShowTabletsBelongCommand showTabletBelongCommand, C context) { return visitCommand(showTabletBelongCommand, context); } + + default R visitCreateRoutineLoadCommand(CreateRoutineLoadCommand createRoutineLoadCommand, C context) { + return visitCommand(createRoutineLoadCommand, context); + } } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy index 7735867c749049..d971a298ffb39d 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy @@ -157,7 +157,7 @@ suite("test_routine_load_condition","p0") { } log.info("reason of state changed: ${res[0][11].toString()}".toString()) def json = parseJson(res[0][11]) - assertEquals("(`k12` >= date_sub(curdate(), INTERVAL 2 DAY))", json.whereExpr.toString()) + assertEquals("(k12 >= CAST(days_sub(current_date(), 2) AS datetimev2(0)))", json.whereExpr.toString()) break; } while (true) { @@ -180,4 +180,4 @@ suite("test_routine_load_condition","p0") { sql "DROP TABLE IF EXISTS ${tableName}" } } -} \ No newline at end of file +} diff --git a/regression-test/suites/load_p2/routine_load/test_routine_load.groovy b/regression-test/suites/load_p2/routine_load/test_routine_load.groovy index 2b59c0f8e60d77..24cb5627f7f666 100644 --- a/regression-test/suites/load_p2/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p2/routine_load/test_routine_load.groovy @@ -221,7 +221,7 @@ suite("test_routine_load_p2","p2,nonConcurrent") { sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text def name = "routine_load_" + tableName - sql """ + checkNereidsExecute(""" CREATE ROUTINE LOAD ${jobs[i]} ON ${name} COLUMNS(${columns[i]}), COLUMNS TERMINATED BY "|" @@ -239,7 +239,7 @@ suite("test_routine_load_p2","p2,nonConcurrent") { "kafka_topic" = "${topics[i]}", "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); - """ + """) sql "sync" i++ } @@ -2380,4 +2380,4 @@ suite("test_routine_load_p2","p2,nonConcurrent") { } } } -} \ No newline at end of file +}