Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement](Nereids) Support create routine load command #43930

Merged
merged 17 commits into from
Nov 27, 2024
25 changes: 11 additions & 14 deletions fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,11 @@ statementBase
| supportedUnsetStatement #supportedUnsetStatementAlias
| supportedRefreshStatement #supportedRefreshStatementAlias
| supportedShowStatement #supportedShowStatementAlias
| supportedLoadStatement #supportedLoadStatementAlias
| supportedRecoverStatement #supportedRecoverStatementAlias
| supportedLoadStatement #supportedLoadfStatementAlias
| unsupportedStatement #unsupported
;



unsupportedStatement
: unsupportedUseStatement
| unsupportedDmlStatement
Expand Down Expand Up @@ -232,11 +230,12 @@ supportedShowStatement
| SHOW TABLE tableId=INTEGER_VALUE #showTableId
| SHOW WHITELIST #showWhitelist
| SHOW TABLETS BELONG
tabletIds+=INTEGER_VALUE (COMMA tabletIds+=INTEGER_VALUE)* #showTabletsBelong
tabletIds+=INTEGER_VALUE (COMMA tabletIds+=INTEGER_VALUE)* #showTabletsBelong
;

supportedLoadStatement
: SYNC #sync
| createRoutineLoad #createRoutineLoadAlias
;

unsupportedOtherStatement
Expand Down Expand Up @@ -351,6 +350,14 @@ unsupportedShowStatement
| SHOW WARM UP JOB wildWhere? #showWarmUpJob
;

createRoutineLoad
: CREATE ROUTINE LOAD label=multipartIdentifier (ON table=identifier)?
(WITH (APPEND | DELETE | MERGE))?
(loadProperty (COMMA loadProperty)*)? propertyClause? FROM type=identifier
LEFT_PAREN customProperties=propertyItemList RIGHT_PAREN
commentSpec?
;

unsupportedLoadStatement
: LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
Expand All @@ -362,11 +369,6 @@ unsupportedLoadStatement
| STOP SYNC JOB name=multipartIdentifier #stopDataSyncJob
| RESUME SYNC JOB name=multipartIdentifier #resumeDataSyncJob
| PAUSE SYNC JOB name=multipartIdentifier #pauseDataSyncJob
| CREATE ROUTINE LOAD label=multipartIdentifier (ON table=identifier)?
(WITH (APPEND | DELETE | MERGE))?
(loadProperty (COMMA loadProperty)*)? propertyClause? FROM type=identifier
LEFT_PAREN customProperties=propertyItemList RIGHT_PAREN
commentSpec? #createRoutineLoadJob
| PAUSE ROUTINE LOAD FOR label=multipartIdentifier #pauseRoutineLoad
| PAUSE ALL ROUTINE LOAD #pauseAllRoutineLoad
| RESUME ROUTINE LOAD FOR label=multipartIdentifier #resumeRoutineLoad
Expand All @@ -376,11 +378,6 @@ unsupportedLoadStatement
| SHOW ROUTINE LOAD TASK ((FROM | IN) database=identifier)? wildWhere? #showRoutineLoadTask
| SHOW ALL? CREATE ROUTINE LOAD FOR label=multipartIdentifier #showCreateRoutineLoad
| SHOW CREATE LOAD FOR label=multipartIdentifier #showCreateLoad
| importSequenceStatement #importSequenceStatementAlias
| importPrecedingFilterStatement #importPrecedingFilterStatementAlias
| importWhereStatement #importWhereStatementAlias
| importDeleteOnStatement #importDeleteOnStatementAlias
| importColumnsStatement #importColumnsStatementAlias
;

loadProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -229,6 +230,51 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
}
}

/*
* make stmt by nereids
*/
public CreateRoutineLoadStmt(LabelName labelName, String dbName, String name, String tableName,
List<ParseNode> loadPropertyList, OriginStatement origStmt, UserIdentity userIdentity,
Map<String, String> jobProperties, String typeName, RoutineLoadDesc routineLoadDesc,
int desireTaskConcurrentNum, long maxErrorNum, double maxFilterRatio, long maxBatchIntervalS,
long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int sendBatchParallelism, String timezone,
String format, String jsonPaths, String jsonRoot, byte enclose, byte escape, long workloadGroupId,
boolean loadToSingleTablet, boolean strictMode, boolean isPartialUpdate, boolean stripOuterArray,
boolean numAsString, boolean fuzzyParse, AbstractDataSourceProperties dataSourceProperties) {
this.labelName = labelName;
this.dbName = dbName;
this.name = name;
this.tableName = tableName;
this.loadPropertyList = loadPropertyList;
this.setOrigStmt(origStmt);
this.setUserInfo(userIdentity);
this.jobProperties = jobProperties;
this.typeName = typeName;
this.routineLoadDesc = routineLoadDesc;
this.desiredConcurrentNum = desireTaskConcurrentNum;
this.maxErrorNum = maxErrorNum;
this.maxFilterRatio = maxFilterRatio;
this.maxBatchIntervalS = maxBatchIntervalS;
this.maxBatchRows = maxBatchRows;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.execMemLimit = execMemLimit;
this.sendBatchParallelism = sendBatchParallelism;
this.timezone = timezone;
this.format = format;
this.jsonPaths = jsonPaths;
this.jsonRoot = jsonRoot;
this.enclose = enclose;
this.escape = escape;
this.workloadGroupId = workloadGroupId;
this.loadToSingleTablet = loadToSingleTablet;
this.strictMode = strictMode;
this.isPartialUpdate = isPartialUpdate;
this.stripOuterArray = stripOuterArray;
this.numAsString = numAsString;
this.fuzzyParse = fuzzyParse;
this.dataSourceProperties = dataSourceProperties;
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public Separator(String separator) {
this.separator = null;
}

public Separator(String separator, String oriSeparator) {
this.oriSeparator = oriSeparator;
this.separator = separator;
}

public String getOriSeparator() {
return oriSeparator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.doris.nereids.DorisParser.ConstantContext;
import org.apache.doris.nereids.DorisParser.CreateMTMVContext;
import org.apache.doris.nereids.DorisParser.CreateProcedureContext;
import org.apache.doris.nereids.DorisParser.CreateRoutineLoadContext;
import org.apache.doris.nereids.DorisParser.CreateRowPolicyContext;
import org.apache.doris.nereids.DorisParser.CreateTableContext;
import org.apache.doris.nereids.DorisParser.CreateTableLikeContext;
Expand Down Expand Up @@ -120,6 +121,12 @@
import org.apache.doris.nereids.DorisParser.IdentifierContext;
import org.apache.doris.nereids.DorisParser.IdentifierListContext;
import org.apache.doris.nereids.DorisParser.IdentifierSeqContext;
import org.apache.doris.nereids.DorisParser.ImportColumnsContext;
import org.apache.doris.nereids.DorisParser.ImportDeleteOnContext;
import org.apache.doris.nereids.DorisParser.ImportPartitionsContext;
import org.apache.doris.nereids.DorisParser.ImportPrecedingFilterContext;
import org.apache.doris.nereids.DorisParser.ImportSequenceContext;
import org.apache.doris.nereids.DorisParser.ImportWhereContext;
import org.apache.doris.nereids.DorisParser.InPartitionDefContext;
import org.apache.doris.nereids.DorisParser.IndexDefContext;
import org.apache.doris.nereids.DorisParser.IndexDefsContext;
Expand All @@ -135,6 +142,7 @@
import org.apache.doris.nereids.DorisParser.LateralViewContext;
import org.apache.doris.nereids.DorisParser.LessThanPartitionDefContext;
import org.apache.doris.nereids.DorisParser.LimitClauseContext;
import org.apache.doris.nereids.DorisParser.LoadPropertyContext;
import org.apache.doris.nereids.DorisParser.LogicalBinaryContext;
import org.apache.doris.nereids.DorisParser.LogicalNotContext;
import org.apache.doris.nereids.DorisParser.MapLiteralContext;
Expand Down Expand Up @@ -187,6 +195,7 @@
import org.apache.doris.nereids.DorisParser.SelectClauseContext;
import org.apache.doris.nereids.DorisParser.SelectColumnClauseContext;
import org.apache.doris.nereids.DorisParser.SelectHintContext;
import org.apache.doris.nereids.DorisParser.SeparatorContext;
import org.apache.doris.nereids.DorisParser.SetCharsetContext;
import org.apache.doris.nereids.DorisParser.SetCollateContext;
import org.apache.doris.nereids.DorisParser.SetDefaultStorageVaultContext;
Expand Down Expand Up @@ -517,6 +526,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateViewInfo;
Expand All @@ -529,6 +539,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.GeneratedColumnDesc;
import org.apache.doris.nereids.trees.plans.commands.info.InPartition;
import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
import org.apache.doris.nereids.trees.plans.commands.info.LessThanPartition;
import org.apache.doris.nereids.trees.plans.commands.info.MTMVPartitionDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionDefinition;
Expand All @@ -553,6 +564,15 @@
import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.load.CreateRoutineLoadCommand;
import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadColumnDesc;
import org.apache.doris.nereids.trees.plans.commands.load.LoadDeleteOnClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadPartitionNames;
import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import org.apache.doris.nereids.trees.plans.commands.load.LoadSequenceClause;
import org.apache.doris.nereids.trees.plans.commands.load.LoadWhereClause;
import org.apache.doris.nereids.trees.plans.commands.refresh.RefreshCatalogCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
Expand Down Expand Up @@ -1436,6 +1456,108 @@ public LogicalSubQueryAlias<Plan> visitAliasQuery(AliasQueryContext ctx) {
});
}

/**
* process LoadProperty in routine load
*/
public LoadProperty visitLoadProperty(LoadPropertyContext ctx) {
LoadProperty loadProperty = null;
if (ctx instanceof SeparatorContext) {
String separator = stripQuotes(((SeparatorContext) ctx).STRING_LITERAL().getText());
loadProperty = new LoadSeparator(separator);
} else if (ctx instanceof ImportColumnsContext) {
List<LoadColumnDesc> descList = new ArrayList<>();
for (DorisParser.ImportColumnDescContext loadColumnDescCtx : ((ImportColumnsContext) ctx)
.importColumnsStatement().importColumnDesc()) {
LoadColumnDesc desc;
if (loadColumnDescCtx.booleanExpression() != null) {
desc = new LoadColumnDesc(loadColumnDescCtx.name.getText(),
getExpression(loadColumnDescCtx.booleanExpression()));
} else {
desc = new LoadColumnDesc(loadColumnDescCtx.name.getText());
}
descList.add(desc);
}
loadProperty = new LoadColumnClause(descList);
} else if (ctx instanceof ImportDeleteOnContext) {
loadProperty = new LoadDeleteOnClause(getExpression(((ImportDeleteOnContext) ctx)
.importDeleteOnStatement().booleanExpression()));
} else if (ctx instanceof ImportPartitionsContext) {
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(
((ImportPartitionsContext) ctx).partitionSpec());
loadProperty = new LoadPartitionNames(partitionSpec.first, partitionSpec.second);
} else if (ctx instanceof ImportPrecedingFilterContext) {
loadProperty = new LoadWhereClause(getExpression(((ImportPrecedingFilterContext) ctx)
.importPrecedingFilterStatement().booleanExpression()), true);
} else if (ctx instanceof ImportSequenceContext) {
loadProperty = new LoadSequenceClause(((ImportSequenceContext) ctx)
.importSequenceStatement().identifier().getText());
} else if (ctx instanceof ImportWhereContext) {
loadProperty = new LoadWhereClause(getExpression(((ImportWhereContext) ctx)
.importWhereStatement().booleanExpression()), false);
}
return loadProperty;
}

@Override
public LogicalPlan visitCreateRoutineLoad(CreateRoutineLoadContext ctx) {
List<String> labelParts = visitMultipartIdentifier(ctx.label);
String labelName = null;
String labelDbName = null;
if (ConnectContext.get().getDatabase().isEmpty() && labelParts.size() == 1) {
throw new AnalysisException("Current database is not set.");
} else if (labelParts.size() == 1) {
labelName = labelParts.get(0);
} else if (labelParts.size() == 2) {
labelDbName = labelParts.get(0);
labelName = labelParts.get(1);
} else if (labelParts.size() == 3) {
labelDbName = labelParts.get(1);
labelName = labelParts.get(2);
} else {
throw new AnalysisException("labelParts in load should be [ctl.][db.]label");
}
LabelNameInfo jobLabelInfo = new LabelNameInfo(labelDbName, labelName);
String tableName = null;
if (ctx.table != null) {
tableName = ctx.table.getText();
}
Map<String, String> properties = ctx.propertyClause() != null
// NOTICE: we should not generate immutable map here, because it will be modified when analyzing.
? Maps.newHashMap(visitPropertyClause(ctx.propertyClause()))
: Maps.newHashMap();
String type = ctx.type.getText();
Map<String, String> customProperties = ctx.customProperties != null
// NOTICE: we should not generate immutable map here, because it will be modified when analyzing.
? Maps.newHashMap(visitPropertyItemList(ctx.customProperties))
: Maps.newHashMap();
LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
if (ctx.WITH() != null) {
if (ctx.DELETE() != null) {
mergeType = LoadTask.MergeType.DELETE;
} else if (ctx.MERGE() != null) {
mergeType = LoadTask.MergeType.MERGE;
}
}
String comment = visitCommentSpec(ctx.commentSpec());
Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
for (DorisParser.LoadPropertyContext oneLoadPropertyCOntext : ctx.loadProperty()) {
LoadProperty loadProperty = visitLoadProperty(oneLoadPropertyCOntext);
if (loadProperty == null) {
throw new AnalysisException("invalid clause of routine load");
}
if (loadPropertyMap.get(loadProperty.getClass().getName()) != null) {
throw new AnalysisException("repeat setting of clause load property: "
+ loadProperty.getClass().getName());
} else {
loadPropertyMap.put(loadProperty.getClass().getName(), loadProperty);
}
}
CreateRoutineLoadInfo createRoutineLoadInfo = new CreateRoutineLoadInfo(jobLabelInfo, tableName,
loadPropertyMap, properties, type, customProperties, mergeType, comment);
return new CreateRoutineLoadCommand(createRoutineLoadInfo);

}

@Override
public Command visitCreateRowPolicy(CreateRowPolicyContext ctx) {
FilterType filterType = FilterType.of(ctx.type.getText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,6 @@ public enum PlanType {
RECOVER_DATABASE_COMMAND,
RECOVER_TABLE_COMMAND,
RECOVER_PARTITION_COMMAND,
REPLAY_COMMAND
REPLAY_COMMAND,
CREATE_ROUTINE_LOAD_COMMAND
}
Loading
Loading