Skip to content

Commit

Permalink
[enhance](mtmv)Enable the MTMVRelatedTableIf interface to support mvcc (
Browse files Browse the repository at this point in the history
#44419)

When using the mvcc table to obtain partition snapshots and other
operations, the snapshotId parameter needs to be included
  • Loading branch information
zddr authored Nov 22, 2024
1 parent 9c1f6ab commit 5edc7f9
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -363,7 +364,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems(OptionalLong.empty());
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
Expand Down Expand Up @@ -392,7 +393,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down Expand Up @@ -425,7 +426,7 @@ public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisExce
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -3260,7 +3261,7 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
Expand All @@ -3284,7 +3285,8 @@ public List<Column> getPartitionColumns() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
Expand All @@ -3294,7 +3296,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -748,7 +749,7 @@ public Set<String> getPartitionColumnNames() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -763,8 +764,8 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId) throws AnalysisException {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -776,7 +777,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -312,7 +313,7 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

Expand All @@ -333,7 +334,8 @@ public List<Column> getPartitionColumns() {
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
Expand All @@ -343,7 +345,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -329,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt
}
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty());
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
Expand Down Expand Up @@ -446,7 +447,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt
if (!baseTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context);
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, OptionalLong.empty());
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot);
}
Expand Down Expand Up @@ -482,7 +483,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty());
refreshPartitionSnapshot.getPartitions()
.put(relatedPartitionName, partitionSnapshot);
}
Expand All @@ -497,7 +498,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres
continue;
}
refreshPartitionSnapshot.addTableSnapshot(baseTableInfo,
((MTMVRelatedTableIf) table).getTableSnapshot(context));
((MTMVRelatedTableIf) table).getTableSnapshot(context, OptionalLong.empty()));
}
return refreshPartitionSnapshot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.AnalysisException;

import java.util.Map;
import java.util.OptionalLong;

/**
* get all related partition descs
Expand All @@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems());
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(OptionalLong.empty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;

/**
Expand All @@ -38,9 +39,10 @@ public interface MTMVRelatedTableIf extends TableIf {
* Note: This method is called every time there is a refresh and transparent rewrite,
* so if this method is slow, it will significantly reduce query performance
*
* @param snapshotId
* @return partitionName->PartitionItem
*/
Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException;
Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException;

/**
* getPartitionType LIST/RANGE/UNPARTITIONED
Expand Down Expand Up @@ -70,24 +72,27 @@ public interface MTMVRelatedTableIf extends TableIf {
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshotId
* @param partitionName
* @param context
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, OptionalLong snapshotId)
throws AnalysisException;

/**
* getTableSnapshot
* It is best to use the version. If there is no version, use the last update time
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshotId
* @param context
* @return table snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) throws AnalysisException;

/**
* Does the current type of table allow timed triggering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Test;

import java.util.List;
import java.util.OptionalLong;
import java.util.Set;

public class MTMVPartitionUtilTest {
Expand Down Expand Up @@ -112,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

baseOlapTable.getTableSnapshot((MTMVRefreshContext) any);
baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (OptionalLong) any);
minTimes = 0;
result = baseSnapshotIf;

Expand All @@ -132,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc
minTimes = 0;
result = true;

baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any);
baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (OptionalLong) any);
minTimes = 0;
result = baseSnapshotIf;

Expand Down

0 comments on commit 5edc7f9

Please sign in to comment.