Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](mtmv)mtmv support paimon partition refresh #43959

Merged
merged 29 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
Expand All @@ -30,25 +42,35 @@
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.table.system.SnapshotsTable;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PaimonExternalTable extends ExternalTable {
public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf {

private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);

Expand All @@ -73,18 +95,95 @@ public Table getPaimonTable() {
return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null);
}

private PaimonPartitionInfo getPartitionInfoFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return new PaimonPartitionInfo();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo();
}

private List<Column> getPartitionColumnsFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return Lists.newArrayList();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns();
}

public long getLatestSnapshotIdFromCache() throws AnalysisException {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
throw new AnalysisException("not present");
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId();
}

@Override
public Optional<SchemaCacheValue> initSchema() {
Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
Set<String> partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(),
Column column = new Column(field.name().toLowerCase(),
paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
field.id()));
field.id());
tmpSchema.add(column);
if (partitionColumnNames.contains(field.name())) {
partitionColumns.add(column);
}
}
try {
// after 0.9.0 paimon will support table.getLatestSnapshotId()
long latestSnapshotId = loadLatestSnapshotId();
PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns);
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId,
partitionInfo));
} catch (IOException | AnalysisException e) {
LOG.warn(e);
return Optional.empty();
}
}

private long loadLatestSnapshotId() throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS);
// snapshotId
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}});
long latestSnapshotId = 0L;
for (InternalRow row : rows) {
long snapshotId = row.getLong(0);
if (snapshotId > latestSnapshotId) {
latestSnapshotId = snapshotId;
}
}
return latestSnapshotId;
}

private PaimonPartitionInfo loadPartitionInfo(List<Column> partitionColumns) throws IOException, AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
return new PaimonPartitionInfo();
}
List<PaimonPartition> paimonPartitions = loadPartitions();
return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
}

private List<PaimonPartition> loadPartitions()
throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS);
List<InternalRow> rows = PaimonUtil.read(table, null);
List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size());
for (InternalRow row : rows) {
res.add(PaimonUtil.rowToPartition(row));
}
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
return res;
}

private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
Expand Down Expand Up @@ -205,4 +304,56 @@ public long fetchRowCount() {
}
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}

@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

@Override
public PartitionType getPartitionType() {
return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames() {
return getPartitionColumnsFromCache().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public List<Column> getPartitionColumns() {
return getPartitionColumnsFromCache();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

@Override
public boolean isPartitionColumnAllowNull() {
// Paimon will write to the 'null' partition regardless of whether it is' null or 'null'.
// The logic is inconsistent with Doris' empty partition logic, so it needs to return false.
// However, when Spark creates Paimon tables, specifying 'not null' does not take effect.
// In order to successfully create the materialized view, false is returned here.
// The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data.
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
public class PaimonPartition {
// Partition values, for example: [1, dd]
private final String partitionValues;
// The amount of data in the partition
private final long recordCount;
// Partition file size
private final long fileSizeInBytes;
// Number of partition files
private final long fileCount;
// Last update time of partition
private final long lastUpdateTime;

public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount,
long lastUpdateTime) {
this.partitionValues = partitionValues;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastUpdateTime = lastUpdateTime;
}

public String getPartitionValues() {
return partitionValues;
}

public long getRecordCount() {
return recordCount;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}

public long getFileCount() {
return fileCount;
}

public long getLastUpdateTime() {
return lastUpdateTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.PartitionItem;

import com.google.common.collect.Maps;

import java.util.Map;

public class PaimonPartitionInfo {
private Map<String, PartitionItem> nameToPartitionItem;
private Map<String, PaimonPartition> nameToPartition;

public PaimonPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToPartition = Maps.newHashMap();
}

public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
Map<String, PaimonPartition> nameToPartition) {
this.nameToPartitionItem = nameToPartitionItem;
this.nameToPartition = nameToPartition;
}

public Map<String, PartitionItem> getNameToPartitionItem() {
return nameToPartitionItem;
}

public Map<String, PaimonPartition> getNameToPartition() {
return nameToPartition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,34 @@
public class PaimonSchemaCacheValue extends SchemaCacheValue {

private Table paimonTable;
private List<Column> partitionColumns;
private PaimonPartitionInfo partitionInfo;

public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
private long snapshootId;

public PaimonSchemaCacheValue(List<Column> schema, List<Column> partitionColumns, Table paimonTable,
long snapshootId,
PaimonPartitionInfo partitionInfo) {
super(schema);
this.partitionColumns = partitionColumns;
this.paimonTable = paimonTable;
this.snapshootId = snapshootId;
this.partitionInfo = partitionInfo;
}

public Table getPaimonTable() {
return paimonTable;
}

public List<Column> getPartitionColumns() {
return partitionColumns;
}

public PaimonPartitionInfo getPartitionInfo() {
return partitionInfo;
}

public long getSnapshootId() {
return snapshootId;
}
}
Loading
Loading