Skip to content

Commit

Permalink
[feature](nereids)support CreateMaterializedViewCommand in nereids
Browse files Browse the repository at this point in the history
  • Loading branch information
starocean999 committed Dec 19, 2024
1 parent 9d23c13 commit e411b92
Show file tree
Hide file tree
Showing 12 changed files with 972 additions and 17 deletions.
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterViewInfo;
Expand Down Expand Up @@ -129,6 +130,18 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
}

public void processCreateMaterializedView(CreateMaterializedViewCommand command)
throws DdlException, AnalysisException, MetaNotFoundException {
String tableName = command.getBaseIndexName();
// check db
String dbName = command.getDBName();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
Env.getCurrentInternalCatalog().checkAvailableCapacity(db);

OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP);
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(command, db, olapTable);
}

public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
TableName tableName = stmt.getTableName();
String dbName = tableName.getDb();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.EditLog;
Expand Down Expand Up @@ -232,6 +233,65 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
}
}

/**
* There are 2 main steps in this function.
* Step1: validate the request.
* Step1.1: semantic analysis: the name of olapTable must be same as the base table name in createMvCommand.
* Step1.2: base table validation: the status of base table and partition could be NORMAL.
* Step1.3: materialized view validation: the name and columns of mv is checked.
* Step2: create mv job
* @param createMvCommand
* @param db
* @param olapTable
* @throws DdlException
*/
public void processCreateMaterializedView(CreateMaterializedViewCommand createMvCommand, Database db,
OlapTable olapTable) throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}

// Step1.1: semantic analysis
// TODO(ML): support the materialized view as base index
if (!createMvCommand.getBaseIndexName().equals(olapTable.getName())) {
throw new DdlException("The name of table in from clause must be same as the name of alter table");
}
// Step1.2: base table validation
String baseIndexName = createMvCommand.getBaseIndexName();
String mvIndexName = createMvCommand.getMVName();
LOG.info("process add materialized view[{}] based on [{}]", mvIndexName, baseIndexName);

// avoid conflict against with batch add rollup job
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL);

long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
// Step1.3: mv clause validation
List<Column> mvColumns = checkAndPrepareMaterializedView(createMvCommand, olapTable);

// Step2: create mv job
RollupJobV2 rollupJobV2 =
createMaterializedViewJob(null, mvIndexName, baseIndexName, mvColumns,
createMvCommand.getWhereClauseItemColumn(olapTable),
createMvCommand.getProperties(), olapTable, db, baseIndexId,
createMvCommand.getMVKeysType(), createMvCommand.getOriginStatement());

addAlterJobV2(rollupJobV2);

olapTable.setState(OlapTableState.ROLLUP);

Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
olapTable.writeUnlock();
}
}

/**
* There are 2 main steps.
* Step1: validate the request
Expand Down Expand Up @@ -479,6 +539,181 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri
return mvJob;
}

private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewCommand createMvCommand,
OlapTable olapTable)
throws DdlException {
// check if mv index already exists
if (olapTable.hasMaterializedIndex(createMvCommand.getMVName())) {
throw new DdlException("Materialized view[" + createMvCommand.getMVName() + "] already exists");
}
if (olapTable.getRowStoreCol() != null) {
throw new DdlException("RowStore table can't create materialized view.");
}
// check if mv columns are valid
// a. Aggregate table:
// 1. all slot's aggregationType must same with value mv column
// 2. all slot's isKey must same with mv column
// 3. value column'define expr must be slot (except all slot belong replace family)
// b. Unique table:
// 1. mv must not contain group expr
// 2. all slot's isKey same with mv column
// 3. mv must contain all key column
// c. Duplicate table:
// 1. Columns resolved by semantics are legal
// 2. Key column not allow float/double type.

// update mv columns
List<MVColumnItem> mvColumnItemList = createMvCommand.getMVColumnItemList();
List<Column> newMVColumns = Lists.newArrayList();

if (olapTable.getKeysType().isAggregationFamily()) {
if (olapTable.getKeysType() == KeysType.AGG_KEYS && createMvCommand.getMVKeysType() != KeysType.AGG_KEYS) {
throw new DdlException("The materialized view of aggregation table must has grouping columns");
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& createMvCommand.getMVKeysType() == KeysType.AGG_KEYS) {
// check b.1
throw new DdlException("The materialized view of unique table must not has grouping columns");
}

for (MVColumnItem mvColumnItem : mvColumnItemList) {
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
mvColumnItem.setIsKey(false);
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey()) {
mvColumnItem.setIsKey(true);
}
}
if (!mvColumnItem.isKey()) {
mvColumnItem.setAggregationType(AggregateType.REPLACE, true);
}
}

// check a.2 and b.2
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey() != mvColumnItem.isKey()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s isKey must same with all slot, mvItem.isKey="
+ (mvColumnItem.isKey() ? "true" : "false"));
}
}

if (!mvColumnItem.isKey() && olapTable.getKeysType() == KeysType.AGG_KEYS) {
// check a.1
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.getAggregationType() != mvColumnItem.getAggregationType()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s AggregationType must same with all slot");
}
}

// check a.3
if (!mvColumnItem.getAggregationType().isReplaceFamily()
&& !(mvColumnItem.getDefineExpr() instanceof SlotRef)
&& !((mvColumnItem.getDefineExpr() instanceof CastExpr)
&& mvColumnItem.getDefineExpr().getChild(0) instanceof SlotRef)) {
throw new DdlException(
"The mvItem[" + mvColumnItem.getName() + "] require slot because it is value column");
}
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
} else {
for (MVColumnItem mvColumnItem : mvColumnItemList) {
Set<String> names = mvColumnItem.getBaseColumnNames();
if (names == null) {
throw new DdlException("Base columns is null");
}

newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
}

for (Column column : newMVColumns) {
// check c.2
if (column.isKey() && column.getType().isFloatingPointType()) {
throw new DdlException("Do not support float/double type on key column, you can change it to decimal");
}
}

// check b.3
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
Set<String> originColumns = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
for (Column column : newMVColumns) {
originColumns.add(CreateMaterializedViewStmt.mvColumnBreaker(column.getName()));
}
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey() && !originColumns.contains(column.getName())) {
throw new DdlException("The materialized view of uniq table must contain all key columns. column:"
+ column.getName());
}
}
}

if (newMVColumns.size() == olapTable.getBaseSchema().size()
&& createMvCommand.getMVKeysType() == olapTable.getKeysType()) {
boolean allKeysMatch = true;
for (int i = 0; i < newMVColumns.size(); i++) {
if (!CreateMaterializedViewStmt.mvColumnBreaker(newMVColumns.get(i).getName())
.equalsIgnoreCase(olapTable.getBaseSchema().get(i).getName())) {
allKeysMatch = false;
break;
}
}
if (allKeysMatch) {
throw new DdlException("MV same with base table is useless.");
}
}

if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasDeleteSign()) {
newMVColumns.add(new Column(olapTable.getDeleteSignColumn()));
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasSequenceCol()) {
newMVColumns.add(new Column(olapTable.getSequenceCol()));
}
if (olapTable.storeRowColumn()) {
Column newColumn = new Column(olapTable.getRowStoreCol());
newColumn.setAggregationType(AggregateType.NONE, true);
newMVColumns.add(newColumn);
}
// if the column is complex type, we forbid to create materialized view
for (Column column : newMVColumns) {
if (column.getDataType().isComplexType() || column.getDataType().isJsonbType()) {
throw new DdlException("The " + column.getDataType() + " column[" + column + "] not support "
+ "to create materialized view");
}
if (createMvCommand.getMVKeysType() != KeysType.AGG_KEYS
&& (column.getType().isBitmapType() || column.getType().isHllType())) {
throw new DdlException("Bitmap/HLL type only support aggregate table");
}
}

if (olapTable.getEnableLightSchemaChange()) {
int nextColUniqueId = Column.COLUMN_UNIQUE_ID_INIT_VALUE + 1;
for (Column column : newMVColumns) {
column.setUniqueId(nextColUniqueId);
nextColUniqueId++;
}
} else {
newMVColumns.forEach(column -> {
column.setUniqueId(Column.COLUMN_UNIQUE_ID_INIT_VALUE);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("lightSchemaChange:{}, newMVColumns:{}", olapTable.getEnableLightSchemaChange(), newMVColumns);
}
return newMVColumns;
}

private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt addMVClause, OlapTable olapTable)
throws DdlException {
// check if mv index already exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ public MVColumnItem(String name, Type type, AggregateType aggregateType, boolean
baseColumnNames = new HashSet<>();

Map<Long, Set<String>> tableIdToColumnNames = defineExpr.getTableIdToColumnNames();
if (defineExpr instanceof SlotRef) {
baseColumnNames = new HashSet<>();
baseColumnNames.add(this.name);
} else if (tableIdToColumnNames.size() == 1) {

if (tableIdToColumnNames.size() == 1) {
for (Map.Entry<Long, Set<String>> entry : tableIdToColumnNames.entrySet()) {
baseColumnNames = entry.getValue();
}
Expand Down Expand Up @@ -104,10 +102,8 @@ public MVColumnItem(Expr defineExpr) throws AnalysisException {
}

Map<Long, Set<String>> tableIdToColumnNames = defineExpr.getTableIdToColumnNames();
if (defineExpr instanceof SlotRef) {
baseColumnNames = new HashSet<>();
baseColumnNames.add(this.name);
} else if (tableIdToColumnNames.size() == 1) {

if (tableIdToColumnNames.size() == 1) {
for (Map.Entry<Long, Set<String>> entry : tableIdToColumnNames.entrySet()) {
baseColumnNames = entry.getValue();
}
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
Expand Down Expand Up @@ -4741,6 +4742,11 @@ public void createMaterializedView(CreateMaterializedViewStmt stmt)
this.alter.processCreateMaterializedView(stmt);
}

public void createMaterializedView(CreateMaterializedViewCommand command)
throws AnalysisException, DdlException, MetaNotFoundException {
this.alter.processCreateMaterializedView(command);
}

public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
this.alter.processDropMaterializedView(stmt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public SlotDescriptor createSlotDesc(TupleDescriptor tupleDesc, SlotReference sl
}
}
slotRef.setTable(table);
slotRef.setLabel(slotReference.getName());
slotRef.setLabel("`" + slotReference.getName() + "`");
if (column.isPresent()) {
slotDescriptor.setAutoInc(column.get().isAutoInc());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@
import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRoleCommand;
Expand Down Expand Up @@ -947,8 +948,7 @@ public Command visitCreateMTMV(CreateMTMVContext ctx) {
if (ctx.buildMode() == null && ctx.refreshMethod() == null && ctx.refreshTrigger() == null
&& ctx.cols == null && ctx.keys == null
&& ctx.HASH() == null && ctx.RANDOM() == null && ctx.BUCKETS() == null) {
// TODO support create sync mv
return new UnsupportedCommand();
return visitCreateSyncMvCommand(ctx);
}

List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
Expand Down Expand Up @@ -985,6 +985,14 @@ public Command visitCreateMTMV(CreateMTMVContext ctx) {
));
}

private Command visitCreateSyncMvCommand(CreateMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
LogicalPlan logicalPlan = new UnboundResultSink<>(visitQuery(ctx.query()));
Map<String, String> properties = ctx.propertyClause() != null
? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap();
return new CreateMaterializedViewCommand(new TableNameInfo(nameParts), logicalPlan, properties);
}

/**
* get MTMVPartitionDefinition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,8 @@ private static Supplier<Optional<String>> buildInternalName(
public String getQualifiedNameWithBackquote() throws UnboundException {
return Utils.qualifiedNameWithBackquote(getQualifier(), getName());
}

public boolean hasAutoInc() {
return column != null ? column.isAutoInc() : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public enum PlanType {
SELECT_INTO_OUTFILE_COMMAND,
UPDATE_COMMAND,
CREATE_MTMV_COMMAND,
CREATE_MATERIALIZED_VIEW_COMMAND,
CREATE_JOB_COMMAND,
PAUSE_JOB_COMMAND,
CANCEL_JOB_COMMAND,
Expand Down
Loading

0 comments on commit e411b92

Please sign in to comment.