Skip to content

Commit

Permalink
[opt](Nereids) lock table in ascending order of table IDs (apache#45045)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:

Doris's table locks are fair read-write locks. If two threads acquire
read locks on tables in different orders and simultaneously a third
thread attempts to acquire a write lock on one of these tables, a
deadlock can form between the two threads trying to acquire read locks.
This PR changes the lock acquisition order for queries to follow the
order of table IDs, ensuring that the lock acquisition order for tables
is consistent among different threads.


### Release note

Execute table locking operations in ascending order of table IDs
  • Loading branch information
morrySnow authored Dec 19, 2024
1 parent 40c6c61 commit 24328d1
Show file tree
Hide file tree
Showing 54 changed files with 1,083 additions and 1,218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ public boolean equals(Object obj) {
&& isKey == other.isKey
&& isAllowNull == other.isAllowNull
&& isAutoInc == other.isAutoInc
&& getDataType().equals(other.getDataType())
&& getStrLen() == other.getStrLen()
&& getPrecision() == other.getPrecision()
&& getScale() == other.getScale()
&& Objects.equals(type, other.type)
&& Objects.equals(comment, other.comment)
&& visible == other.visible
&& Objects.equals(children, other.children)
Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, true);
}
} catch (Throwable e) {
mtmvCache = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ public MTMVCache getOrGenerateCache(ConnectContext connectionContext) throws Ana
MTMVCache mtmvCache;
try {
// Should new context with ADMIN user
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true, false);
} finally {
connectionContext.setThreadLocalInfo();
}
Expand Down Expand Up @@ -362,7 +362,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
*
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
Expand Down Expand Up @@ -392,7 +392,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down
30 changes: 11 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -3325,33 +3324,26 @@ public PartitionType getPartitionType() {
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
public Map<String, PartitionItem> getAndCopyPartitionItems() {
readLock();
try {
return getAndCopyPartitionItemsWithoutLock();
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
if (partition != null) {
res.put(partition.getName(), entry.getValue());
}
}
return res;
} finally {
readUnlock();
}
}

public Map<String, PartitionItem> getAndCopyPartitionItemsWithoutLock() throws AnalysisException {
Map<String, PartitionItem> res = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) {
Partition partition = idToPartition.get(entry.getKey());
if (partition != null) {
res.put(partition.getName(), entry.getValue());
}
}
return res;
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
Expand Down
134 changes: 48 additions & 86 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,55 +213,43 @@ default Map<String, Constraint> getConstraintsMapUnsafe() {
}

default Set<ForeignKeyConstraint> getForeignKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(ForeignKeyConstraint.class::isInstance)
.map(ForeignKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Map<String, Constraint> getConstraintsMap() {
readLock();
try {
return ImmutableMap.copyOf(getConstraintsMapUnsafe());
} catch (Exception ignored) {
return ImmutableMap.of();
} finally {
readUnlock();
}
}

default Set<PrimaryKeyConstraint> getPrimaryKeyConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(PrimaryKeyConstraint.class::isInstance)
.map(PrimaryKeyConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

default Set<UniqueConstraint> getUniqueConstraints() {
readLock();
try {
return getConstraintsMapUnsafe().values().stream()
.filter(UniqueConstraint.class::isInstance)
.map(UniqueConstraint.class::cast)
.collect(ImmutableSet.toImmutableSet());
} catch (Exception ignored) {
return ImmutableSet.of();
} finally {
readUnlock();
}
}

Expand All @@ -280,34 +268,24 @@ default void checkConstraintNotExistenceUnsafe(String name, Constraint primaryKe
}

default void addUniqueConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(uniqueConstraint, this));
}
}

default void addPrimaryKeyConstraint(String name, ImmutableList<String> columns, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(primaryKeyConstraint, this));
}
}

Expand All @@ -326,26 +304,19 @@ default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(

default void addForeignConstraint(String name, ImmutableList<String> columns,
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
writeLock();
referencedTable.writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
} finally {
referencedTable.writeUnlock();
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
ForeignKeyConstraint foreignKeyConstraint =
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
foreignKeyConstraint.getReferencedColumnNames());
PrimaryKeyConstraint primaryKeyConstraint =
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
primaryKeyConstraint.addForeignTable(this);
constraintMap.put(name, foreignKeyConstraint);
if (!replay) {
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(foreignKeyConstraint, this));
}
}

Expand Down Expand Up @@ -381,40 +352,31 @@ default void replayDropConstraint(String name) {
}

default void dropConstraint(String name, boolean replay) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
} finally {
writeUnlock();
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
if (!constraintMap.containsKey(name)) {
throw new AnalysisException(
String.format("Unknown constraint %s on table %s.", name, this.getName()));
}
Constraint constraint = constraintMap.get(name);
constraintMap.remove(name);
if (constraint instanceof PrimaryKeyConstraint) {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
if (!replay) {
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
}
}

default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);
} finally {
writeUnlock();
}
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
Set<String> fkName = constraintMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ForeignKeyConstraint
&& ((ForeignKeyConstraint) e.getValue()).isReferringPK(table, constraint))
.map(Entry::getKey)
.collect(Collectors.toSet());
fkName.forEach(constraintMap::remove);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
}

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down Expand Up @@ -378,16 +382,38 @@ private boolean dataMaskPoliciesChanged(
return false;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
/**
* Execute table locking operations in ascending order of table IDs.
*
* @return true if obtain all tables lock.
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
for (FullTableName fullTableName : sqlCacheContext.getUsedViews().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
}
currentStatementContext.getTables().put(fullTableName.toList(), tableIf);
}
currentStatementContext.lock();
return true;
}

private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
for (Entry<FullTableName, Set<String>> kv : sqlCacheContext.getCheckPrivilegeTablesOrViews().entrySet()) {
Set<String> usedColumns = kv.getValue();
TableIf tableIf = findTableIf(env, kv.getKey());
if (tableIf == null) {
return true;
}
// release when close statementContext
currentStatementContext.addTableReadLock(tableIf);
try {
UserAuthentication.checkPermission(tableIf, connectContext, usedColumns);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.doris.common.lock;

import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.ConnectContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* A monitored version of ReentrantReadWriteLock that provides additional
* monitoring capabilities for read and write locks.
*/
public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock {

private static final Logger LOG = LogManager.getLogger(MonitoredReentrantReadWriteLock.class);
// Monitored read and write lock instances
private final ReadLock readLock = new ReadLock(this);
private final WriteLock writeLock = new WriteLock(this);
Expand Down Expand Up @@ -97,6 +105,11 @@ protected WriteLock(ReentrantReadWriteLock lock) {
public void lock() {
super.lock();
monitor.afterLock();
if (isFair() && getReadHoldCount() > 0) {
LOG.warn(" read lock count is {}, write lock count is {}, stack is {}, query id is {}",
getReadHoldCount(), getWriteHoldCount(), Thread.currentThread().getStackTrace(),
ConnectContext.get() == null ? "" : DebugUtil.printId(ConnectContext.get().queryId()));
}
}

/**
Expand Down
Loading

0 comments on commit 24328d1

Please sign in to comment.