Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jan 6, 2025
1 parent 049857e commit f458b88
Show file tree
Hide file tree
Showing 5 changed files with 783 additions and 770 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
Expand All @@ -39,6 +40,7 @@
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
import org.apache.doris.nereids.SqlCacheContext.TableVersion;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundVariable;
import org.apache.doris.nereids.parser.NereidsParser;
Expand Down Expand Up @@ -308,20 +310,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
return true;
}

for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
// the query maybe scan empty partition of the table, we should check these table version too,
// but the table not exists in sqlCacheContext.getScanTables(), so we need check here
// check table type and version
for (Entry<FullTableName, TableVersion> scanTable : sqlCacheContext.getUsedTables().entrySet()) {
TableVersion tableVersion = scanTable.getValue();
if (tableVersion.type != TableType.OLAP) {
return true;
}
TableIf tableIf = findTableIf(env, scanTable.getKey());
if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) {
return true;
}

OlapTable olapTable = (OlapTable) tableIf;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = scanTable.latestVersion;
long cacheTableVersion = tableVersion.version;
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableVersion != cacheTableVersion) {
return true;
}
}

// check partition version
for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
return true;
}
OlapTable olapTable = (OlapTable) tableIf;
Collection<Long> partitionIds = scanTable.getScanPartitions();
olapTable.getVersionInBatchForCloudMode(partitionIds);

Expand Down Expand Up @@ -392,7 +410,7 @@ private boolean dataMaskPoliciesChanged(
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.FieldInfo;
Expand All @@ -41,6 +43,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -61,7 +64,8 @@ public class SqlCacheContext {
private volatile long latestPartitionTime = -1;
private volatile long latestPartitionVersion = -1;
private volatile long sumOfPartitionNum = -1;
private final Set<FullTableName> usedTables = Sets.newLinkedHashSet();
// value: version of table
private final Map<FullTableName, TableVersion> usedTables = Maps.newLinkedHashMap();
// value: ddl sql
private final Map<FullTableName, String> usedViews = Maps.newLinkedHashMap();
// value: usedColumns
Expand Down Expand Up @@ -135,8 +139,13 @@ public synchronized void addUsedTable(TableIf tableIf) {
return;
}

usedTables.add(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName())
usedTables.put(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()),
new TableVersion(
tableIf.getId(),
tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L,
tableIf.getType()
)
);
}

Expand Down Expand Up @@ -282,8 +291,8 @@ public void setCacheProxy(CacheProxy cacheProxy) {
this.cacheProxy = cacheProxy;
}

public Set<FullTableName> getUsedTables() {
return ImmutableSet.copyOf(usedTables);
public Map<FullTableName, TableVersion> getUsedTables() {
return Collections.unmodifiableMap(usedTables);
}

public Map<FullTableName, String> getUsedViews() {
Expand Down Expand Up @@ -458,6 +467,14 @@ public void addScanPartition(Long partitionId) {
}
}

@lombok.Data
@lombok.AllArgsConstructor
public static class TableVersion {
public final long id;
public final long version;
public final TableType type;
}

/** CacheKeyType */
public enum CacheKeyType {
// use `userIdentity`:`sql`.trim() as Cache key in FE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ public List<Plan> getAncestors() {
}

public void updateActualRowCount(long actualRowCount) {
statistics.setActualRowCount(actualRowCount);
if (statistics != null) {
statistics.setActualRowCount(actualRowCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.doris.regression.RegressionTest
import org.apache.doris.regression.action.BenchmarkAction
import org.apache.doris.regression.action.ProfileAction
import org.apache.doris.regression.action.WaitForAction
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.OutputUtils
import org.apache.doris.regression.action.CreateMVAction
import org.apache.doris.regression.action.ExplainAction
Expand All @@ -60,13 +59,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.sql.Connection
import java.io.File
import java.math.BigDecimal;
import java.sql.PreparedStatement
import java.sql.ResultSetMetaData
import java.util.Map;
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ThreadFactory
Expand Down Expand Up @@ -1087,6 +1080,20 @@ class Suite implements GroovyInterceptable {
}
}

void foreachFrontends(Closure action) {
def rows = sql_return_maparray("show frontends")
for (def row in rows) {
action(row)
}
}

void foreachBackends(Closure action) {
def rows = sql_return_maparray("show backends")
for (def row in rows) {
action(row)
}
}

List<String> getFrontendIpHttpPort() {
return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort };
}
Expand Down
Loading

0 comments on commit f458b88

Please sign in to comment.