Skip to content

Commit

Permalink
[Feature] Support multi expression partition table (#52407)
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Nov 13, 2024
1 parent 1d43043 commit 93148a9
Show file tree
Hide file tree
Showing 11 changed files with 482 additions and 23 deletions.
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,17 @@ public SlotRef unwrapSlotRef(boolean implicitOnly) {
return null;
}

public List<SlotRef> collectAllSlotRefs() {
List<SlotRef> result = Lists.newArrayList();
if (this instanceof SlotRef) {
result.add((SlotRef) this);
}
for (Expr child : children) {
result.addAll(child.collectAllSlotRefs());
}
return result;
}

/**
* Returns the first child if this Expr is a CastExpr. Otherwise, returns 'this'.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class FeConstants {
// the raw data of one tablet equals to 10GB approximately
public static final long AUTO_DISTRIBUTION_UNIT = 3221225472L;

public static final String GENERATED_PARTITION_COLUMN_PREFIX = "__generated_partition_column_";

// Max counter num of TOP K function
public static final int MAX_COUNTER_NUM_OF_TOP_K = 100000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.CatalogMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.server.TemporaryTableMgr;
import com.starrocks.sql.ast.ColumnDef;
import com.starrocks.sql.ast.CreateTableStmt;
Expand Down Expand Up @@ -294,6 +293,10 @@ private static void analyzeKeysDesc(CreateTableStmt stmt) {
} else {
int keyLength = 0;
for (ColumnDef columnDef : columnDefs) {
// generated column should not be key
if (columnDef.isGeneratedColumn()) {
break;
}
keyLength += columnDef.getType().getIndexSize();
if (keysColumnNames.size() >= FeConstants.SHORTKEY_MAX_COLUMN_COUNT
|| keyLength > FeConstants.SHORTKEY_MAXSIZE_BYTES) {
Expand Down Expand Up @@ -472,6 +475,15 @@ public static void analyzePartitionDesc(CreateTableStmt stmt) {
} catch (AnalysisException e) {
throw new SemanticException(e.getMessage());
}
if (partitionDesc instanceof ListPartitionDesc) {
ListPartitionDesc listPartitionDesc = (ListPartitionDesc) partitionDesc;
if (listPartitionDesc.getPartitionExprs().size() > 0 &&
(stmt.getKeysDesc().getKeysType() == KeysType.AGG_KEYS
|| stmt.getKeysDesc().getKeysType() == KeysType.UNIQUE_KEYS)) {
throw new SemanticException("expression partition base on generated column"
+ " doest not support AGG_KEYS or UNIQUE_KEYS", partitionDesc.getPos());
}
}
} else if (partitionDesc instanceof ExpressionPartitionDesc) {
ExpressionPartitionDesc expressionPartitionDesc = (ExpressionPartitionDesc) partitionDesc;
try {
Expand Down Expand Up @@ -571,10 +583,6 @@ public static void analyzeGeneratedColumn(CreateTableStmt stmt, ConnectContext c
throw new SemanticException("Generated Column does not support AGG table");
}

if (RunMode.isSharedDataMode()) {
throw new SemanticException("Does not support generated column in shared data cluster yet");
}

final TableName tableNameObject = stmt.getDbTbl();

List<Column> columns = stmt.getColumns();
Expand All @@ -591,7 +599,7 @@ public static void analyzeGeneratedColumn(CreateTableStmt stmt, ConnectContext c

if (column.isGeneratedColumn()) {
if (keysDesc.containsCol(column.getName())) {
throw new SemanticException("Generated Column can not be KEY");
throw new SemanticException("Generated Column " + column.getName() + " can not be KEY");
}

Expr expr = column.getGeneratedColumnExpr(columns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.SlotRef;
import com.starrocks.catalog.AggregateType;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.ColumnId;
Expand Down Expand Up @@ -52,16 +54,30 @@ public class ListPartitionDesc extends PartitionDesc {
// for automatic partition table is ture. otherwise is false
protected boolean isAutoPartitionTable = false;

protected List<Expr> partitionExprs = Lists.newArrayList();

public ListPartitionDesc(List<String> partitionColNames,
List<PartitionDesc> partitionDescs) {
this(partitionColNames, partitionDescs, NodePosition.ZERO);
this(partitionColNames, partitionDescs, Lists.newArrayList(), NodePosition.ZERO);
}

public ListPartitionDesc(List<String> partitionColNames,
List<PartitionDesc> partitionDescs, NodePosition pos) {
this(partitionColNames, partitionDescs, Lists.newArrayList(), pos);
}

public ListPartitionDesc(List<String> partitionColNames,
List<PartitionDesc> partitionDescs,
List<Expr> partitionExprs) {
this(partitionColNames, partitionDescs, partitionExprs, NodePosition.ZERO);
}

public ListPartitionDesc(List<String> partitionColNames,
List<PartitionDesc> partitionDescs, List<Expr> partitionExprs, NodePosition pos) {
super(pos);
super.type = PartitionType.LIST;
this.partitionColNames = partitionColNames;
this.partitionExprs = partitionExprs;
this.singleListPartitionDescs = Lists.newArrayList();
this.multiListPartitionDescs = Lists.newArrayList();
if (partitionDescs != null) {
Expand Down Expand Up @@ -90,6 +106,10 @@ public List<String> getPartitionColNames() {
return partitionColNames;
}

public List<Expr> getPartitionExprs() {
return partitionExprs;
}

public List<String> findAllPartitionNames() {
List<String> partitionNames = new ArrayList<>();
this.singleListPartitionDescs.forEach(desc -> partitionNames.add(desc.getPartitionName()));
Expand All @@ -101,6 +121,8 @@ public List<String> findAllPartitionNames() {
public void analyze(List<ColumnDef> columnDefs, Map<String, String> tableProperties) throws AnalysisException {
// analyze partition columns
List<ColumnDef> columnDefList = this.analyzePartitionColumns(columnDefs);
// analyze partition expr
this.analyzePartitionExprs(columnDefs);
// analyze single list property
this.analyzeSingleListPartition(tableProperties, columnDefList);
// analyze multi list partition
Expand All @@ -109,6 +131,19 @@ public void analyze(List<ColumnDef> columnDefs, Map<String, String> tablePropert
this.postAnalyzePartitionColumns(columnDefList);
}

public void analyzePartitionExprs(List<ColumnDef> columnDefs) throws AnalysisException {
List<String> slotRefs = partitionExprs.stream()
.flatMap(e -> e.collectAllSlotRefs().stream())
.map(SlotRef::getColumnName)
.collect(Collectors.toList());
for (ColumnDef columnDef : columnDefs) {
if (slotRefs.contains(columnDef.getName()) && !columnDef.isKey()
&& columnDef.getAggregateType() != AggregateType.NONE) {
throw new AnalysisException("The partition expr should base on key column");
}
}
}

public List<ColumnDef> analyzePartitionColumns(List<ColumnDef> columnDefs) throws AnalysisException {
if (this.partitionColNames == null || this.partitionColNames.isEmpty()) {
throw new AnalysisException("No partition columns.");
Expand All @@ -127,7 +162,8 @@ public List<ColumnDef> analyzePartitionColumns(List<ColumnDef> columnDefs) throw
throw new AnalysisException(String.format("Invalid partition column '%s': %s",
columnDef.getName(), "invalid data type " + columnDef.getType()));
}
if (!columnDef.isKey() && columnDef.getAggregateType() != AggregateType.NONE) {
if (!columnDef.isKey() && columnDef.getAggregateType() != AggregateType.NONE
&& !columnDef.isGeneratedColumn()) {
throw new AnalysisException("The partition column could not be aggregated column"
+ " and unique table's partition column must be key column");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,17 @@ public boolean isMonotonicFunction(CallOperator call) {

FunctionInvoker invoker = functions.get(signature);

return invoker != null && invoker.isMonotonic;
return invoker != null && isMonotonicFunc(invoker, call);
}

private boolean isMonotonicFunc(FunctionInvoker invoker, CallOperator operator) {
if (!invoker.isMonotonic) {
return false;
}

if (FunctionSet.DATE_FORMAT.equalsIgnoreCase(invoker.getSignature().getName())) {
if (FunctionSet.DATE_FORMAT.equalsIgnoreCase(invoker.getSignature().getName())
|| (FunctionSet.FROM_UNIXTIME.equalsIgnoreCase(invoker.getSignature().getName())
&& operator.getChildren().size() == 2)) {
String pattern = operator.getChild(1).toString();
if (pattern.isEmpty()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ public static ConstantOperator unixTimestampNow() {
}

@ConstantFunction.List(list = {
@ConstantFunction(name = "unix_timestamp", argTypes = {DATETIME}, returnType = BIGINT),
@ConstantFunction(name = "unix_timestamp", argTypes = {DATE}, returnType = BIGINT)
@ConstantFunction(name = "unix_timestamp", argTypes = {DATETIME}, returnType = BIGINT, isMonotonic = true),
@ConstantFunction(name = "unix_timestamp", argTypes = {DATE}, returnType = BIGINT, isMonotonic = true)
})
public static ConstantOperator unixTimestamp(ConstantOperator arg) {
LocalDateTime dt = arg.getDatetime();
Expand All @@ -624,8 +624,8 @@ public static ConstantOperator unixTimestamp(ConstantOperator arg) {
}

@ConstantFunction.List(list = {
@ConstantFunction(name = "from_unixtime", argTypes = {INT}, returnType = VARCHAR),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT}, returnType = VARCHAR)
@ConstantFunction(name = "from_unixtime", argTypes = {INT}, returnType = VARCHAR, isMonotonic = true),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT}, returnType = VARCHAR, isMonotonic = true)
})
public static ConstantOperator fromUnixTime(ConstantOperator unixTime) throws AnalysisException {
long value = 0;
Expand All @@ -644,7 +644,7 @@ public static ConstantOperator fromUnixTime(ConstantOperator unixTime) throws An
}

@ConstantFunction.List(list = {
@ConstantFunction(name = "from_unixtime_ms", argTypes = {BIGINT}, returnType = VARCHAR)
@ConstantFunction(name = "from_unixtime_ms", argTypes = {BIGINT}, returnType = VARCHAR, isMonotonic = true),
})
public static ConstantOperator fromUnixTimeMs(ConstantOperator unixTime) throws AnalysisException {
long millisecond = unixTime.getBigint();
Expand All @@ -660,8 +660,8 @@ public static ConstantOperator fromUnixTimeMs(ConstantOperator unixTime) throws
}

@ConstantFunction.List(list = {
@ConstantFunction(name = "from_unixtime", argTypes = {INT, VARCHAR}, returnType = VARCHAR),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT, VARCHAR}, returnType = VARCHAR)
@ConstantFunction(name = "from_unixtime", argTypes = {INT, VARCHAR}, returnType = VARCHAR, isMonotonic = true),
@ConstantFunction(name = "from_unixtime", argTypes = {BIGINT, VARCHAR}, returnType = VARCHAR, isMonotonic = true)
})
public static ConstantOperator fromUnixTime(ConstantOperator unixTime, ConstantOperator fmtLiteral)
throws AnalysisException {
Expand Down
Loading

0 comments on commit 93148a9

Please sign in to comment.