Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](nereids)support CreateMaterializedViewCommand in nereids #45674

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterViewInfo;
Expand Down Expand Up @@ -131,6 +132,18 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
}

public void processCreateMaterializedView(CreateMaterializedViewCommand command)
throws DdlException, AnalysisException, MetaNotFoundException {
String tableName = command.getBaseIndexName();
// check db
String dbName = command.getDBName();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
Env.getCurrentInternalCatalog().checkAvailableCapacity(db);

OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP);
((MaterializedViewHandler) materializedViewHandler).processCreateMaterializedView(command, db, olapTable);
}

public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
TableName tableName = stmt.getTableName();
String dbName = tableName.getDb();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.EditLog;
Expand Down Expand Up @@ -232,6 +233,65 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
}
}

/**
* There are 2 main steps in this function.
* Step1: validate the request.
* Step1.1: semantic analysis: the name of olapTable must be same as the base table name in createMvCommand.
* Step1.2: base table validation: the status of base table and partition could be NORMAL.
* Step1.3: materialized view validation: the name and columns of mv is checked.
* Step2: create mv job
* @param createMvCommand
* @param db
* @param olapTable
* @throws DdlException
*/
public void processCreateMaterializedView(CreateMaterializedViewCommand createMvCommand, Database db,
OlapTable olapTable) throws DdlException, AnalysisException {
// wait wal delete
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
olapTable.writeLockOrDdlException();
try {
olapTable.checkNormalStateForAlter();
if (olapTable.existTempPartitions()) {
throw new DdlException("Can not alter table when there are temp partitions in table");
}

// Step1.1: semantic analysis
// TODO(ML): support the materialized view as base index
if (!createMvCommand.getBaseIndexName().equals(olapTable.getName())) {
throw new DdlException("The name of table in from clause must be same as the name of alter table");
}
// Step1.2: base table validation
String baseIndexName = createMvCommand.getBaseIndexName();
String mvIndexName = createMvCommand.getMVName();
LOG.info("process add materialized view[{}] based on [{}]", mvIndexName, baseIndexName);

// avoid conflict against with batch add rollup job
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL);

long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
// Step1.3: mv clause validation
List<Column> mvColumns = checkAndPrepareMaterializedView(createMvCommand, olapTable);

// Step2: create mv job
RollupJobV2 rollupJobV2 =
createMaterializedViewJob(null, mvIndexName, baseIndexName, mvColumns,
createMvCommand.getWhereClauseItemColumn(olapTable),
createMvCommand.getProperties(), olapTable, db, baseIndexId,
createMvCommand.getMVKeysType(), createMvCommand.getOriginStatement());

addAlterJobV2(rollupJobV2);

olapTable.setState(OlapTableState.ROLLUP);

Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
olapTable.writeUnlock();
}
}

/**
* There are 2 main steps.
* Step1: validate the request
Expand Down Expand Up @@ -479,6 +539,181 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri
return mvJob;
}

private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewCommand createMvCommand,
OlapTable olapTable)
throws DdlException {
// check if mv index already exists
if (olapTable.hasMaterializedIndex(createMvCommand.getMVName())) {
throw new DdlException("Materialized view[" + createMvCommand.getMVName() + "] already exists");
}
if (olapTable.getRowStoreCol() != null) {
throw new DdlException("RowStore table can't create materialized view.");
}
// check if mv columns are valid
// a. Aggregate table:
// 1. all slot's aggregationType must same with value mv column
// 2. all slot's isKey must same with mv column
// 3. value column'define expr must be slot (except all slot belong replace family)
// b. Unique table:
// 1. mv must not contain group expr
// 2. all slot's isKey same with mv column
// 3. mv must contain all key column
// c. Duplicate table:
// 1. Columns resolved by semantics are legal
// 2. Key column not allow float/double type.

// update mv columns
List<MVColumnItem> mvColumnItemList = createMvCommand.getMVColumnItemList();
List<Column> newMVColumns = Lists.newArrayList();

if (olapTable.getKeysType().isAggregationFamily()) {
if (olapTable.getKeysType() == KeysType.AGG_KEYS && createMvCommand.getMVKeysType() != KeysType.AGG_KEYS) {
throw new DdlException("The materialized view of aggregation table must has grouping columns");
}
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& createMvCommand.getMVKeysType() == KeysType.AGG_KEYS) {
// check b.1
throw new DdlException("The materialized view of unique table must not has grouping columns");
}

for (MVColumnItem mvColumnItem : mvColumnItemList) {
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
mvColumnItem.setIsKey(false);
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey()) {
mvColumnItem.setIsKey(true);
}
}
if (!mvColumnItem.isKey()) {
mvColumnItem.setAggregationType(AggregateType.REPLACE, true);
}
}

// check a.2 and b.2
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.isKey() != mvColumnItem.isKey()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s isKey must same with all slot, mvItem.isKey="
+ (mvColumnItem.isKey() ? "true" : "false"));
}
}

if (!mvColumnItem.isKey() && olapTable.getKeysType() == KeysType.AGG_KEYS) {
// check a.1
for (String slotName : mvColumnItem.getBaseColumnNames()) {
if (olapTable
.getColumn(MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slotName)))
.getAggregationType() != mvColumnItem.getAggregationType()) {
throw new DdlException("The mvItem[" + mvColumnItem.getName()
+ "]'s AggregationType must same with all slot");
}
}

// check a.3
if (!mvColumnItem.getAggregationType().isReplaceFamily()
&& !(mvColumnItem.getDefineExpr() instanceof SlotRef)
&& !((mvColumnItem.getDefineExpr() instanceof CastExpr)
&& mvColumnItem.getDefineExpr().getChild(0) instanceof SlotRef)) {
throw new DdlException(
"The mvItem[" + mvColumnItem.getName() + "] require slot because it is value column");
}
}
newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
} else {
for (MVColumnItem mvColumnItem : mvColumnItemList) {
Set<String> names = mvColumnItem.getBaseColumnNames();
if (names == null) {
throw new DdlException("Base columns is null");
}

newMVColumns.add(mvColumnItem.toMVColumn(olapTable));
}
}

for (Column column : newMVColumns) {
// check c.2
if (column.isKey() && column.getType().isFloatingPointType()) {
throw new DdlException("Do not support float/double type on key column, you can change it to decimal");
}
}

// check b.3
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
Set<String> originColumns = new TreeSet<String>(String.CASE_INSENSITIVE_ORDER);
for (Column column : newMVColumns) {
originColumns.add(CreateMaterializedViewStmt.mvColumnBreaker(column.getName()));
}
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey() && !originColumns.contains(column.getName())) {
throw new DdlException("The materialized view of uniq table must contain all key columns. column:"
+ column.getName());
}
}
}

if (newMVColumns.size() == olapTable.getBaseSchema().size()
&& createMvCommand.getMVKeysType() == olapTable.getKeysType()) {
boolean allKeysMatch = true;
for (int i = 0; i < newMVColumns.size(); i++) {
if (!CreateMaterializedViewStmt.mvColumnBreaker(newMVColumns.get(i).getName())
.equalsIgnoreCase(olapTable.getBaseSchema().get(i).getName())) {
allKeysMatch = false;
break;
}
}
if (allKeysMatch) {
throw new DdlException("MV same with base table is useless.");
}
}

if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasDeleteSign()) {
newMVColumns.add(new Column(olapTable.getDeleteSignColumn()));
}
if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && olapTable.hasSequenceCol()) {
newMVColumns.add(new Column(olapTable.getSequenceCol()));
}
if (olapTable.storeRowColumn()) {
Column newColumn = new Column(olapTable.getRowStoreCol());
newColumn.setAggregationType(AggregateType.NONE, true);
newMVColumns.add(newColumn);
}
// if the column is complex type, we forbid to create materialized view
for (Column column : newMVColumns) {
if (column.getDataType().isComplexType() || column.getDataType().isJsonbType()) {
throw new DdlException("The " + column.getDataType() + " column[" + column + "] not support "
+ "to create materialized view");
}
if (createMvCommand.getMVKeysType() != KeysType.AGG_KEYS
&& (column.getType().isBitmapType() || column.getType().isHllType())) {
throw new DdlException("Bitmap/HLL type only support aggregate table");
}
}

if (olapTable.getEnableLightSchemaChange()) {
int nextColUniqueId = Column.COLUMN_UNIQUE_ID_INIT_VALUE + 1;
for (Column column : newMVColumns) {
column.setUniqueId(nextColUniqueId);
nextColUniqueId++;
}
} else {
newMVColumns.forEach(column -> {
column.setUniqueId(Column.COLUMN_UNIQUE_ID_INIT_VALUE);
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("lightSchemaChange:{}, newMVColumns:{}", olapTable.getEnableLightSchemaChange(), newMVColumns);
}
return newMVColumns;
}

private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt addMVClause, OlapTable olapTable)
throws DdlException {
// check if mv index already exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ public MVColumnItem(String name, Type type, AggregateType aggregateType, boolean
this.aggregationType = aggregateType;
this.isAggregationTypeImplicit = isAggregationTypeImplicit;
this.defineExpr = defineExpr;
baseColumnNames = new HashSet<>();

Map<Long, Set<String>> tableIdToColumnNames = defineExpr.getTableIdToColumnNames();
if (defineExpr instanceof SlotRef) {
baseColumnNames = new HashSet<>();
baseColumnNames.add(this.name);
} else if (tableIdToColumnNames.size() == 1) {

if (tableIdToColumnNames.size() == 1) {
for (Map.Entry<Long, Set<String>> entry : tableIdToColumnNames.entrySet()) {
baseColumnNames = entry.getValue();
}
} else {
baseColumnNames = new HashSet<>();
}
}

Expand Down Expand Up @@ -104,13 +103,13 @@ public MVColumnItem(Expr defineExpr) throws AnalysisException {
}

Map<Long, Set<String>> tableIdToColumnNames = defineExpr.getTableIdToColumnNames();
if (defineExpr instanceof SlotRef) {
baseColumnNames = new HashSet<>();
baseColumnNames.add(this.name);
} else if (tableIdToColumnNames.size() == 1) {

if (tableIdToColumnNames.size() == 1) {
for (Map.Entry<Long, Set<String>> entry : tableIdToColumnNames.entrySet()) {
baseColumnNames = entry.getValue();
}
} else {
baseColumnNames = new HashSet<>();
}
}

Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
Expand Down Expand Up @@ -4753,6 +4754,11 @@ public void createMaterializedView(CreateMaterializedViewStmt stmt)
this.alter.processCreateMaterializedView(stmt);
}

public void createMaterializedView(CreateMaterializedViewCommand command)
throws AnalysisException, DdlException, MetaNotFoundException {
this.alter.processCreateMaterializedView(command);
}

public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
this.alter.processDropMaterializedView(stmt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@
import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateRoleCommand;
Expand Down Expand Up @@ -1024,8 +1025,7 @@ public Command visitCreateMTMV(CreateMTMVContext ctx) {
if (ctx.buildMode() == null && ctx.refreshMethod() == null && ctx.refreshTrigger() == null
&& ctx.cols == null && ctx.keys == null
&& ctx.HASH() == null && ctx.RANDOM() == null && ctx.BUCKETS() == null) {
// TODO support create sync mv
return new UnsupportedCommand();
return visitCreateSyncMvCommand(ctx);
}

List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
Expand Down Expand Up @@ -1062,6 +1062,14 @@ public Command visitCreateMTMV(CreateMTMVContext ctx) {
));
}

private Command visitCreateSyncMvCommand(CreateMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
LogicalPlan logicalPlan = new UnboundResultSink<>(visitQuery(ctx.query()));
Map<String, String> properties = ctx.propertyClause() != null
? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : Maps.newHashMap();
return new CreateMaterializedViewCommand(new TableNameInfo(nameParts), logicalPlan, properties);
}

/**
* get MTMVPartitionDefinition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,8 @@ private static Supplier<Optional<String>> buildInternalName(
public String getQualifiedNameWithBackquote() throws UnboundException {
return Utils.qualifiedNameWithBackquote(getQualifier(), getName());
}

public boolean hasAutoInc() {
return column != null ? column.isAutoInc() : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public enum PlanType {
SELECT_INTO_OUTFILE_COMMAND,
UPDATE_COMMAND,
CREATE_MTMV_COMMAND,
CREATE_MATERIALIZED_VIEW_COMMAND,
CREATE_JOB_COMMAND,
PAUSE_JOB_COMMAND,
CANCEL_JOB_COMMAND,
Expand Down
Loading
Loading