From 84856782d9d653af17b98d32afae9deac0b7266f Mon Sep 17 00:00:00 2001 From: zhangdong Date: Wed, 13 Nov 2024 19:39:00 +0800 Subject: [PATCH 01/24] 1 --- .../paimon/PaimonExternalTable.java | 150 ++++++++++++++++- .../datasource/paimon/PaimonPartition.java | 55 +++++++ .../paimon/PaimonPartitionInfo.java | 41 +++++ .../paimon/PaimonSchemaCacheValue.java | 23 ++- .../doris/datasource/paimon/PaimonUtil.java | 151 ++++++++++++++++++ .../apache/doris/mtmv/MTMVRelatedTableIf.java | 6 +- 6 files changed, 418 insertions(+), 8 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df32ef..dc190398934967 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -18,10 +18,22 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.mtmv.MTMVBaseTableIf; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVRelatedTableIf; +import org.apache.doris.mtmv.MTMVSnapshotIf; +import org.apache.doris.mtmv.MTMVTimestampSnapshot; +import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -30,25 +42,34 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; -public class PaimonExternalTable extends ExternalTable { +public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); @@ -73,18 +94,92 @@ public Table getPaimonTable() { return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } + public PaimonPartitionInfo getPartitionInfoFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + public List getPartitionColumnsFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId(); + } + @Override public Optional initSchema() { Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); TableSchema schema = ((FileStoreTable) paimonTable).schema(); List columns = schema.fields(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys()); + List partitionColumns = Lists.newArrayList(); for (DataField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(), + Column column = new Column(field.name().toLowerCase(), paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id())); + field.id()); + tmpSchema.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + try { + // after 0.9.0 paimon will support table.getLatestSnapshotId() + long latestSnapshotId = loadLatestSnapshotId(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns); + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId, + partitionInfo)); + } catch (IOException | AnalysisException e) { + LOG.warn(e); + return Optional.empty(); + } + } + + private long loadLatestSnapshotId() throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + List rows = PaimonUtil.read(table, new int[][] {{0}});// snapshotId + List res = Lists.newArrayListWithCapacity(rows.size()); + long latestSnapshotId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + } } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); + return latestSnapshotId; + } + + private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { + List paimonPartitions = loadPartitions(); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions() + throws IOException { + Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, + name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -205,4 +300,51 @@ public long fetchRowCount() { } return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map getAndCopyPartitionItems() throws AnalysisException { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType() throws AnalysisException { + return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; + } + + @Override + public Set getPartitionColumnNames() throws DdlException, AnalysisException { + return getPartitionColumnsFromCache().stream() + .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); + } + + @Override + public List getPartitionColumns() throws AnalysisException { + return getPartitionColumnsFromCache(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) + throws AnalysisException { + PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (paimonPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java new file mode 100644 index 00000000000000..d0cccd0c040970 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonPartition { + private String partitionValues; + private long recordCount; + private long fileSizeInBytes; + private long fileCount; + private long lastUpdateTime; + + public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime) { + this.partitionValues = partitionValues; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + } + + public String getPartitionValues() { + return partitionValues; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java new file mode 100644 index 00000000000000..c3cf5dc4da313c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.PartitionItem; + +import java.util.Map; + +public class PaimonPartitionInfo { + private Map nameToPartitionItem; + private Map nameToPartition; + + public PaimonPartitionInfo(Map nameToPartitionItem, + Map nameToPartition) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToPartition = nameToPartition; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToPartition() { + return nameToPartition; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index aaaefe7f32db2b..20d27b2425df24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -27,13 +27,34 @@ public class PaimonSchemaCacheValue extends SchemaCacheValue { private Table paimonTable; + private List partitionColumns; + private PaimonPartitionInfo partitionInfo; - public PaimonSchemaCacheValue(List schema, Table paimonTable) { + private long snapshootId; + + public PaimonSchemaCacheValue(List schema, List partitionColumns, Table paimonTable, + long snapshootId, + PaimonPartitionInfo partitionInfo) { super(schema); + this.partitionColumns = partitionColumns; this.paimonTable = paimonTable; + this.snapshootId = snapshootId; + this.partitionInfo = partitionInfo; } public Table getPaimonTable() { return paimonTable; } + + public List getPartitionColumns() { + return partitionColumns; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshootId() { + return snapshootId; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java new file mode 100644 index 00000000000000..ee3bb2ecc72217 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HiveUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class PaimonUtil { + public static List read( + Table table, @Nullable int[][] projection, Pair, String>... dynamicOptions) + throws IOException { + Map options = new HashMap<>(); + for (Pair, String> pair : dynamicOptions) { + options.put(pair.getKey().key(), pair.getValue()); + } + table = table.copy(options); + ReadBuilder readBuilder = table.newReadBuilder(); + if (projection != null) { + readBuilder.withProjection(projection); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } + + + /* + https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table + +---------------+----------------+--------------------+--------------------+------------------------+ + | partition | record_count | file_size_in_bytes| file_count| last_update_time| + +---------------+----------------+--------------------+--------------------+------------------------+ + | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| + +---------------+----------------+--------------------+--------------------+------------------------+ + org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "record_count", new BigIntType(false)), + new DataField(2, "file_size_in_bytes", new BigIntType(false)), + new DataField(3, "file_count", new BigIntType(false)), + new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); + */ + public static PaimonPartition rowToPartition(InternalRow row) { + String partition = row.getString(0).toString(); + long recordCount = row.getLong(1); + long fileSizeInBytes = row.getLong(2); + long fileCount = row.getLong(3); + long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); + return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); + } + + public static PaimonPartitionInfo generatePartitionInfo(List partitionColumns, + List paimonPartitions) throws AnalysisException { + Map nameToPartitionItem = Maps.newHashMap(); + Map nameToPartition = Maps.newHashMap(); + PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); + if (CollectionUtils.isEmpty(partitionColumns)) { + return partitionInfo; + } + for (PaimonPartition paimonPartition : paimonPartitions) { + String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); + nameToPartition.put(partitionName, paimonPartition); + nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); + } + return partitionInfo; + } + + private static String getPartitionName(List partitionColumns, String partitionValueStr) { + Preconditions.checkNotNull(partitionValueStr); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + } + return sb.toString(); + } + + public static ListPartitionItem toListPartitionItem(String partitionName, List partitionColumns) + throws AnalysisException { + List types = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + List partitionValues = HiveUtil.toPartitionValues(partitionName); + Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types); + List values = Lists.newArrayListWithExpectedSize(types.size()); + for (String partitionValue : partitionValues) { + // null data will be 'null' + values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); + } + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key)); + return listPartitionItem; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 4a8b14603ce4d6..5a21792f472b20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -47,7 +47,7 @@ public interface MTMVRelatedTableIf extends TableIf { * * @return */ - PartitionType getPartitionType(); + PartitionType getPartitionType() throws AnalysisException; /** * getPartitionColumnNames @@ -55,14 +55,14 @@ public interface MTMVRelatedTableIf extends TableIf { * @return * @throws DdlException */ - Set getPartitionColumnNames() throws DdlException; + Set getPartitionColumnNames() throws DdlException, AnalysisException; /** * getPartitionColumns * * @return */ - List getPartitionColumns(); + List getPartitionColumns() throws AnalysisException; /** * getPartitionSnapshot From d1a6780a8b51aeecee2bbc15c5acb759d6fb04e8 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Wed, 13 Nov 2024 19:58:03 +0800 Subject: [PATCH 02/24] 1 --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 4 ++-- .../apache/doris/datasource/paimon/PaimonPartitionInfo.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index dc190398934967..68cc8258d4bc93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -153,8 +153,8 @@ public Optional initSchema() { private long loadLatestSnapshotId() throws IOException { Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); - List rows = PaimonUtil.read(table, new int[][] {{0}});// snapshotId - List res = Lists.newArrayListWithCapacity(rows.size()); + // snapshotId + List rows = PaimonUtil.read(table, new int[][] {{0}}); long latestSnapshotId = 0L; for (InternalRow row : rows) { long snapshotId = row.getLong(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index c3cf5dc4da313c..54385e57ef9bda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -23,7 +23,7 @@ public class PaimonPartitionInfo { private Map nameToPartitionItem; - private Map nameToPartition; + private Map nameToPartition; public PaimonPartitionInfo(Map nameToPartitionItem, Map nameToPartition) { From edbe24a6c98b12a2e1e7aa7c43ec8d1afb2b4581 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Wed, 13 Nov 2024 20:09:12 +0800 Subject: [PATCH 03/24] 1 --- .../datasource/paimon/PaimonExternalTable.java | 16 ++++++++-------- .../datasource/paimon/PaimonPartitionInfo.java | 7 +++++++ .../apache/doris/mtmv/MTMVRelatedTableIf.java | 6 +++--- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 68cc8258d4bc93..d60534b22e8192 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -94,20 +94,20 @@ public Table getPaimonTable() { return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } - public PaimonPartitionInfo getPartitionInfoFromCache() throws AnalysisException { + public PaimonPartitionInfo getPartitionInfoFromCache() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("not present"); + return new PaimonPartitionInfo(); } return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); } - public List getPartitionColumnsFromCache() throws AnalysisException { + public List getPartitionColumnsFromCache() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("not present"); + return Lists.newArrayList(); } return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns(); } @@ -308,23 +308,23 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override - public Map getAndCopyPartitionItems() throws AnalysisException { + public Map getAndCopyPartitionItems() { return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); } @Override - public PartitionType getPartitionType() throws AnalysisException { + public PartitionType getPartitionType() { return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override - public Set getPartitionColumnNames() throws DdlException, AnalysisException { + public Set getPartitionColumnNames() { return getPartitionColumnsFromCache().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public List getPartitionColumns() throws AnalysisException { + public List getPartitionColumns() { return getPartitionColumnsFromCache(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index 54385e57ef9bda..8f54f0834e481b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -19,12 +19,19 @@ import org.apache.doris.catalog.PartitionItem; +import com.google.common.collect.Maps; + import java.util.Map; public class PaimonPartitionInfo { private Map nameToPartitionItem; private Map nameToPartition; + public PaimonPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToPartition = Maps.newHashMap(); + } + public PaimonPartitionInfo(Map nameToPartitionItem, Map nameToPartition) { this.nameToPartitionItem = nameToPartitionItem; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 5a21792f472b20..4a8b14603ce4d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -47,7 +47,7 @@ public interface MTMVRelatedTableIf extends TableIf { * * @return */ - PartitionType getPartitionType() throws AnalysisException; + PartitionType getPartitionType(); /** * getPartitionColumnNames @@ -55,14 +55,14 @@ public interface MTMVRelatedTableIf extends TableIf { * @return * @throws DdlException */ - Set getPartitionColumnNames() throws DdlException, AnalysisException; + Set getPartitionColumnNames() throws DdlException; /** * getPartitionColumns * * @return */ - List getPartitionColumns() throws AnalysisException; + List getPartitionColumns(); /** * getPartitionSnapshot From 5af3e68bce6fbab68c66bc2dc83dd6c22d3d8f72 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 11:41:11 +0800 Subject: [PATCH 04/24] 1 --- .../java/org/apache/doris/datasource/paimon/PaimonUtil.java | 5 ++++- .../trees/plans/commands/UpdateMvByPartitionCommand.java | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index ee3bb2ecc72217..53075081579c03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -141,7 +141,10 @@ public static ListPartitionItem toListPartitionItem(String partitionName, List values = Lists.newArrayListWithExpectedSize(types.size()); for (String partitionValue : partitionValues) { - // null data will be 'null' + // null will in partition 'null' + // "null" will in partition 'null' + // NULL will in partition 'null' + // "NULL" will in partition 'NULL' values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); } PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index de284bd837748f..d5634afb0a03e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; @@ -179,6 +180,8 @@ private static Expression convertListPartitionToIn(PartitionItem item, Slot col) inValues = inValues.stream() .filter(e -> !(e instanceof NullLiteral)) .collect(Collectors.toList()); + // paimon "null" will in NULL partition + inValues.add(new StringLiteral("null")); Expression isNullPredicate = new IsNull(col); predicates.add(isNullPredicate); } From 4ddad6441e4caa27fe8032048c96537d1847104d Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 12:33:34 +0800 Subject: [PATCH 05/24] 1 --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 2 +- .../java/org/apache/doris/datasource/paimon/PaimonUtil.java | 3 ++- .../trees/plans/commands/UpdateMvByPartitionCommand.java | 3 --- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index d60534b22e8192..c734950d0c1f99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -345,6 +345,6 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws Analys @Override public boolean isPartitionColumnAllowNull() { - return true; + return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 53075081579c03..8b7017cac29486 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -145,7 +145,8 @@ public static ListPartitionItem toListPartitionItem(String partitionName, List !(e instanceof NullLiteral)) .collect(Collectors.toList()); - // paimon "null" will in NULL partition - inValues.add(new StringLiteral("null")); Expression isNullPredicate = new IsNull(col); predicates.add(isNullPredicate); } From 2be1729e207abb657d42102bc4fd21bfedb60aef Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 12:57:37 +0800 Subject: [PATCH 06/24] 1 --- .../org/apache/doris/datasource/paimon/PaimonExternalTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c734950d0c1f99..d60534b22e8192 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -345,6 +345,6 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws Analys @Override public boolean isPartitionColumnAllowNull() { - return false; + return true; } } From 0d58c44d0e83339972665e2f3c83ecb7ce07e266 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 15:44:26 +0800 Subject: [PATCH 07/24] 1 --- .../paimon/PaimonExternalTable.java | 5 + .../suites/mtmv_p0/test_paimon_mtmv.groovy | 116 ++++++++++++------ 2 files changed, 82 insertions(+), 39 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index d60534b22e8192..586509b1517ef1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -345,6 +345,11 @@ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws Analys @Override public boolean isPartitionColumnAllowNull() { + // Paimon will write to the 'null' partition regardless of whether it is' null or 'null'. + // The logic is inconsistent with Doris' empty partition logic, so it needs to return false. + // However, when Spark creates Paimon tables, specifying 'not null' does not take effect. + // In order to successfully create the materialized view, false is returned here. + // The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data. return true; } } diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index e84eb497b2c7b1..36ada91cca6b7e 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -15,48 +15,86 @@ // specific language governing permissions and limitations // under the License. -suite("test_paimon_mtmv", "p0,external,paimon,external_docker,external_docker_hive") { +suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { String enabled = context.config.otherConfigs.get("enablePaimonTest") - logger.info("enabled: " + enabled) + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - logger.info("externalEnvIp: " + externalEnvIp) - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") - logger.info("hdfs_port: " + hdfs_port) - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "paimon_mtmv_catalog"; - String mvName = "test_paimon_mtmv" - String dbName = "regression_test_mtmv_p0" - String paimonDb = "db1" - String paimonTable = "all_table" - sql """drop catalog if exists ${catalog_name} """ - - sql """create catalog if not exists ${catalog_name} properties ( - "type" = "paimon", - "paimon.catalog.type"="filesystem", - "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" );""" - order_qt_catalog """select * from ${catalog_name}.${paimonDb}.${paimonTable}""" - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable}; - """ - - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv "SELECT * FROM ${mvName}" - - sql """drop materialized view if exists ${mvName};""" - sql """ drop catalog if exists ${catalog_name} """ - } + qt_order """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by par,id" + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} order by par,id" + + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalogName}""" + sql """ + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + """ + + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} order by par,id" + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + } From e360a249342797edbf615210b712aa09760ddb4e Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 15:48:36 +0800 Subject: [PATCH 08/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index 36ada91cca6b7e..f5d864352e4efe 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -74,14 +74,14 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori // rebuild catalog, should not Affects MTMV sql """drop catalog if exists ${catalogName}""" sql """ - sql """CREATE CATALOG ${catalogName} PROPERTIES ( + CREATE CATALOG ${catalogName} PROPERTIES ( 'type'='paimon', 'warehouse' = 's3://warehouse/wh/', "s3.access_key" = "admin", "s3.secret_key" = "password", "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", "s3.region" = "us-east-1" - );""" + ); """ order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" From 63313e1cb3bc3523a0ff75f22b510e8aafb30d8b Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 15:50:49 +0800 Subject: [PATCH 09/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index f5d864352e4efe..eaeebf946b7292 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -90,7 +90,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori sql """ REFRESH MATERIALIZED VIEW ${mvName} complete """ - waitingMTMVTaskFinished(jobName) + waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} order by par,id" sql """drop materialized view if exists ${mvName};""" From 637581516ebef72889b941faae04982998ddd09b Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 15:54:53 +0800 Subject: [PATCH 10/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index eaeebf946b7292..6fb4b49d7318c7 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -24,6 +24,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori String suiteName = "test_paimon_mtmv" String catalogName = "${suiteName}_catalog" String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) String minio_port = context.config.otherConfigs.get("iceberg_minio_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") From f66b585db686d3bcec7a095184f34087419baad3 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 15:58:44 +0800 Subject: [PATCH 11/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index 6fb4b49d7318c7..d83aa76f1d067b 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -39,7 +39,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori "s3.region" = "us-east-1" );""" - qt_order """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ + order_qt """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ sql """drop materialized view if exists ${mvName};""" From 74f5e6dd6f1da1af221446e73127ac35ebaa01a4 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 16:00:31 +0800 Subject: [PATCH 12/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index d83aa76f1d067b..83dd4ab5405df3 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -39,7 +39,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori "s3.region" = "us-east-1" );""" - order_qt """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ + order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ sql """drop materialized view if exists ${mvName};""" From 426ffcbe87cae3b06ec9bfd0573da9eb7d711de3 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 16:09:55 +0800 Subject: [PATCH 13/24] 1 --- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index 83dd4ab5405df3..8d8b996dea1784 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -39,7 +39,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori "s3.region" = "us-east-1" );""" - order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format order by par,id; """ + order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ sql """drop materialized view if exists ${mvName};""" @@ -62,16 +62,16 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); """ waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by par,id" + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " //refresh auto sql """ REFRESH MATERIALIZED VIEW ${mvName} auto """ waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_auto "SELECT * FROM ${mvName} order by par,id" - + order_qt_refresh_auto "SELECT * FROM ${mvName} " order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // rebuild catalog, should not Affects MTMV sql """drop catalog if exists ${catalogName}""" sql """ @@ -84,7 +84,6 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori "s3.region" = "us-east-1" ); """ - order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" // should refresh normal after catalog rebuild @@ -92,7 +91,26 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori REFRESH MATERIALIZED VIEW ${mvName} complete """ waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} order by par,id" + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + + // not have partition + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + + //should can refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_not_partition "SELECT * FROM ${mvName} " sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalogName}""" From 3f48b2986d79f5568f47cc94b2d4aa9bd9ad19ce Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 16:13:04 +0800 Subject: [PATCH 14/24] 1 --- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index 8d8b996dea1784..f2989edbf6dfd6 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -104,14 +104,14 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori AS SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; """ - + order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" //should can refresh auto sql """ REFRESH MATERIALIZED VIEW ${mvName} auto """ waitingMTMVTaskFinishedByMvName(mvName) order_qt_not_partition "SELECT * FROM ${mvName} " - + order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalogName}""" From 9496c3002e49ca0ecfc05e41bf7d4de61ea361cd Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 16:15:06 +0800 Subject: [PATCH 15/24] 1 --- .../data/mtmv_p0/test_paimon_mtmv.out | 116 +++++++++++++++++- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out index c654cb01214f57..c28b7cb7baca22 100644 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ b/regression-test/data/mtmv_p0/test_paimon_mtmv.out @@ -1,9 +1,113 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !catalog -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !base_table -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b --- !mtmv -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 +-- !refresh_one_partition -- +1 2 a +10 1 a +2 2 a +3 2 a +4 2 a +5 2 a +6 1 a +7 1 a +8 1 a +9 1 a + +-- !refresh_auto -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_before -- +false + +-- !not_partition -- +1 2 a +1 2 b +10 1 a +10 1 b +2 2 a +2 2 b +3 2 a +3 2 b +4 2 a +4 2 b +5 2 a +5 2 b +6 1 a +6 1 b +7 1 a +7 1 b +8 1 a +8 1 b +9 1 a +9 1 b + +-- !not_partition_after -- +true From aba5fb4737810a69d7bf6617bfca59a20c8fb93c Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 14 Nov 2024 16:24:48 +0800 Subject: [PATCH 16/24] 1 --- .../apache/doris/datasource/paimon/PaimonExternalTable.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 586509b1517ef1..78d66959dbb66c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -44,6 +44,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.catalog.Catalog; @@ -166,6 +167,9 @@ private long loadLatestSnapshotId() throws IOException { } private PaimonPartitionInfo loadPartitionInfo(List partitionColumns) throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } List paimonPartitions = loadPartitions(); return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); } From 6d0d00b675cd690126d1f89ed255018a7da3ffba Mon Sep 17 00:00:00 2001 From: zhangdong Date: Fri, 15 Nov 2024 18:57:42 +0800 Subject: [PATCH 17/24] add ut --- .../org/apache/doris/mtmv/PaimonUtilTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java new file mode 100644 index 00000000000000..789af7bf8357ac --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonPartition; +import org.apache.doris.datasource.paimon.PaimonPartitionInfo; +import org.apache.doris.datasource.paimon.PaimonUtil; + +import com.google.common.collect.Lists; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PaimonUtilTest { + + @Test + public void testGeneratePartitionInfo() throws AnalysisException { + Column k1 = new Column("k1", PrimitiveType.INT); + Column k2 = new Column("k2", PrimitiveType.VARCHAR); + List partitionColumns = Lists.newArrayList(k1, k2); + PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); + List paimonPartitions = Lists.newArrayList(p1); + PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + String expectPartitionName = "k1=1/k2=aa"; + Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); + PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); + List keys = partitionItem.getItems(); + Assert.assertEquals(1, keys.size()); + PartitionKey partitionKey = keys.get(0); + List exprs = partitionKey.getKeys(); + Assert.assertEquals(2, exprs.size()); + Assert.assertEquals(1, exprs.get(0).getLongValue()); + Assert.assertEquals("aa", exprs.get(1).getStringValue()); + } + + @Test + public void testRowToPartition() { + GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); + PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); + Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); + Assert.assertEquals(2L, paimonPartition.getRecordCount()); + Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); + Assert.assertEquals(4L, paimonPartition.getFileCount()); + Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); + } +} From 2206584ca51e6df07dce6d560f32f8c24ed27f73 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Mon, 18 Nov 2024 20:14:01 +0800 Subject: [PATCH 18/24] drop case test pipeline --- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 119 ------------------ 1 file changed, 119 deletions(-) delete mode 100644 regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy deleted file mode 100644 index f2989edbf6dfd6..00000000000000 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { - String enabled = context.config.otherConfigs.get("enablePaimonTest") - if (enabled == null || !enabled.equalsIgnoreCase("true")) { - logger.info("disabled paimon test") - return - } - String suiteName = "test_paimon_mtmv" - String catalogName = "${suiteName}_catalog" - String mvName = "${suiteName}_mv" - String dbName = context.config.getDbNameByFile(context.file) - - String minio_port = context.config.otherConfigs.get("iceberg_minio_port") - String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - - sql """drop catalog if exists ${catalogName}""" - sql """CREATE CATALOG ${catalogName} PROPERTIES ( - 'type'='paimon', - 'warehouse' = 's3://warehouse/wh/', - "s3.access_key" = "admin", - "s3.secret_key" = "password", - "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", - "s3.region" = "us-east-1" - );""" - - order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ - - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - partition by(`par`) - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; - """ - def showPartitionsResult = sql """show partitions from ${mvName}""" - logger.info("showPartitionsResult: " + showPartitionsResult.toString()) - assertTrue(showPartitionsResult.toString().contains("p_a")) - assertTrue(showPartitionsResult.toString().contains("p_b")) - - // refresh one partitions - sql """ - REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_one_partition "SELECT * FROM ${mvName} " - - //refresh auto - sql """ - REFRESH MATERIALIZED VIEW ${mvName} auto - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_auto "SELECT * FROM ${mvName} " - order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - - // rebuild catalog, should not Affects MTMV - sql """drop catalog if exists ${catalogName}""" - sql """ - CREATE CATALOG ${catalogName} PROPERTIES ( - 'type'='paimon', - 'warehouse' = 's3://warehouse/wh/', - "s3.access_key" = "admin", - "s3.secret_key" = "password", - "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", - "s3.region" = "us-east-1" - ); - """ - order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - - // should refresh normal after catalog rebuild - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " - - sql """drop materialized view if exists ${mvName};""" - - // not have partition - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; - """ - order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - //should can refresh auto - sql """ - REFRESH MATERIALIZED VIEW ${mvName} auto - """ - waitingMTMVTaskFinishedByMvName(mvName) - order_qt_not_partition "SELECT * FROM ${mvName} " - order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - sql """drop materialized view if exists ${mvName};""" - sql """drop catalog if exists ${catalogName}""" - -} - From 90b6ae9038feab8be0b175fbd06c06adb81ce817 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 10:22:01 +0800 Subject: [PATCH 19/24] drop case test pipeline --- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy new file mode 100644 index 00000000000000..8a186b428bce77 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + String suiteName = "test_paimon_mtmv" + String catalogName = "${suiteName}_catalog" + String mvName = "${suiteName}_mv" + String dbName = context.config.getDbNameByFile(context.file) + + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalogName}""" + sql """CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + order_qt_base_table """ select * from ${catalogName}.test_paimon_spark.test_tb_mix_format ; """ + + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + def showPartitionsResult = sql """show partitions from ${mvName}""" + logger.info("showPartitionsResult: " + showPartitionsResult.toString()) + assertTrue(showPartitionsResult.toString().contains("p_a")) + assertTrue(showPartitionsResult.toString().contains("p_b")) + + // refresh one partitions + sql """ + REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a); + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + + //refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + +} + From cf7ef0d97879f358a07824312c5e41558ae50ff0 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 15:27:02 +0800 Subject: [PATCH 20/24] not drop catalog --- .../suites/mtmv_p0/test_paimon_mtmv.groovy | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index 8a186b428bce77..f00add2b53c71e 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -70,9 +70,48 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori """ waitingMTMVTaskFinishedByMvName(mvName) order_qt_refresh_auto "SELECT * FROM ${mvName} " + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" - sql """drop materialized view if exists ${mvName};""" + // rebuild catalog, should not Affects MTMV sql """drop catalog if exists ${catalogName}""" + sql """ + CREATE CATALOG ${catalogName} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + ); + """ + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} " + + sql """drop materialized view if exists ${mvName};""" + // not have partition + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${catalogName}.`test_paimon_spark`.test_tb_mix_format; + """ + order_qt_not_partition_before "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + //should can refresh auto + sql """ + REFRESH MATERIALIZED VIEW ${mvName} auto + """ + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_not_partition "SELECT * FROM ${mvName} " + order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + sql """drop materialized view if exists ${mvName};""" } From 99bcd99d31ddee9936558f98d9e9540888a23a79 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 16:15:41 +0800 Subject: [PATCH 21/24] 1 --- fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java | 3 ++- fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java | 4 ++-- regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 2b213d0558385a..15c8df9195ddf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -1022,7 +1022,8 @@ public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) { mtmv.alterMvProperties(alterMTMV.getMvProperties()); break; case ADD_TASK: - mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots()); + mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots(), + isReplay); break; default: throw new RuntimeException("Unknown type value: " + alterMTMV.getOpType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index b0d25ad2b252b2..f75d817efc5542 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -190,11 +190,11 @@ public MTMVStatus alterStatus(MTMVStatus newStatus) { } public void addTaskResult(MTMVTask task, MTMVRelation relation, - Map partitionSnapshots) { + Map partitionSnapshots, boolean isReplay) { MTMVCache mtmvCache = null; boolean needUpdateCache = false; if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread() - && !Config.enable_check_compatibility_mode) { + && !Config.enable_check_compatibility_mode && !isReplay) { needUpdateCache = true; try { // shouldn't do this while holding mvWriteLock diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy index f00add2b53c71e..f2989edbf6dfd6 100644 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy @@ -113,5 +113,7 @@ suite("test_paimon_mtmv", "p0,external,mtmv,external_docker,external_docker_dori order_qt_not_partition "SELECT * FROM ${mvName} " order_qt_not_partition_after "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """drop materialized view if exists ${mvName};""" + sql """drop catalog if exists ${catalogName}""" + } From 96d7beab7a4e66fd34f4d599e96883f7f8708b3e Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 16:43:53 +0800 Subject: [PATCH 22/24] 1 --- .../src/main/java/org/apache/doris/catalog/MTMV.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index f75d817efc5542..955bfd4279fd5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -194,11 +194,15 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation, MTMVCache mtmvCache = null; boolean needUpdateCache = false; if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread() - && !Config.enable_check_compatibility_mode && !isReplay) { + && !Config.enable_check_compatibility_mode) { needUpdateCache = true; try { - // shouldn't do this while holding mvWriteLock - mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true); + // The replay thread may not have initialized the catalog yet to avoid getting stuck due + // to connection issues such as S3, so it is directly set to null + if (!isReplay) { + // shouldn't do this while holding mvWriteLock + mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true); + } } catch (Throwable e) { mtmvCache = null; LOG.warn("generate cache failed", e); From 080627339c6c4aa1585a67805563c49dda3631d6 Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 20:09:01 +0800 Subject: [PATCH 23/24] roll back --- .../src/main/java/org/apache/doris/alter/Alter.java | 3 +-- .../src/main/java/org/apache/doris/catalog/MTMV.java | 10 +++------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 15c8df9195ddf1..2b213d0558385a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -1022,8 +1022,7 @@ public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) { mtmv.alterMvProperties(alterMTMV.getMvProperties()); break; case ADD_TASK: - mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots(), - isReplay); + mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots()); break; default: throw new RuntimeException("Unknown type value: " + alterMTMV.getOpType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 955bfd4279fd5c..b0d25ad2b252b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -190,19 +190,15 @@ public MTMVStatus alterStatus(MTMVStatus newStatus) { } public void addTaskResult(MTMVTask task, MTMVRelation relation, - Map partitionSnapshots, boolean isReplay) { + Map partitionSnapshots) { MTMVCache mtmvCache = null; boolean needUpdateCache = false; if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread() && !Config.enable_check_compatibility_mode) { needUpdateCache = true; try { - // The replay thread may not have initialized the catalog yet to avoid getting stuck due - // to connection issues such as S3, so it is directly set to null - if (!isReplay) { - // shouldn't do this while holding mvWriteLock - mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true); - } + // shouldn't do this while holding mvWriteLock + mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true); } catch (Throwable e) { mtmvCache = null; LOG.warn("generate cache failed", e); From 4b3089996533ab3db933fa4453fe146160f9fadf Mon Sep 17 00:00:00 2001 From: zhangdong Date: Tue, 19 Nov 2024 20:22:33 +0800 Subject: [PATCH 24/24] comment --- .../datasource/paimon/PaimonExternalTable.java | 4 ++-- .../doris/datasource/paimon/PaimonPartition.java | 16 +++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 78d66959dbb66c..5645c4e89e726c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -95,7 +95,7 @@ public Table getPaimonTable() { return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } - public PaimonPartitionInfo getPartitionInfoFromCache() { + private PaimonPartitionInfo getPartitionInfoFromCache() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); if (!schemaCacheValue.isPresent()) { @@ -104,7 +104,7 @@ public PaimonPartitionInfo getPartitionInfoFromCache() { return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); } - public List getPartitionColumnsFromCache() { + private List getPartitionColumnsFromCache() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); if (!schemaCacheValue.isPresent()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java index d0cccd0c040970..545448199b3375 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -17,12 +17,18 @@ package org.apache.doris.datasource.paimon; +// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table public class PaimonPartition { - private String partitionValues; - private long recordCount; - private long fileSizeInBytes; - private long fileCount; - private long lastUpdateTime; + // Partition values, for example: [1, dd] + private final String partitionValues; + // The amount of data in the partition + private final long recordCount; + // Partition file size + private final long fileSizeInBytes; + // Number of partition files + private final long fileCount; + // Last update time of partition + private final long lastUpdateTime; public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, long lastUpdateTime) {