Skip to content

Commit

Permalink
[BugFix] Fix physical partition for rebalance (backport #46402) (back…
Browse files Browse the repository at this point in the history
…port #52182) (#52214)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] authored Oct 22, 2024
1 parent 87e46cb commit 6d81f75
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 137 deletions.
12 changes: 8 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,9 @@ public TTableDescriptor toThrift(List<ReferencedPartitionInfo> partitions) {
public long getRowCount() {
long rowCount = 0;
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
rowCount += entry.getValue().getBaseIndex().getRowCount();
for (PhysicalPartition partition : entry.getValue().getSubPartitions()) {
rowCount += partition.getBaseIndex().getRowCount();
}
}
return rowCount;
}
Expand Down Expand Up @@ -3151,9 +3153,11 @@ public void removeTabletsFromInvertedIndex() {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
Collection<Partition> allPartitions = getAllPartitions();
for (Partition partition : allPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.MysqlTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Table;
Expand Down Expand Up @@ -361,7 +361,7 @@ private void genTaskFragments(List<PlanFragment> fragments, List<ScanNode> scanN
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
long dataSize = 0L;
if (tabletMeta.isLakeTablet()) {
Partition partition = exportTable.getPartition(tabletMeta.getPartitionId());
PhysicalPartition partition = exportTable.getPhysicalPartition(tabletMeta.getPhysicalPartitionId());
if (partition != null) {
MaterializedIndex index = partition.getIndex(tabletMeta.getIndexId());
if (index != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.SinglePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -408,9 +409,11 @@ private void gc(boolean isReplay) {

Partition partition = targetTable.getPartition(pid);
if (partition != null) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
// hash set is able to deduplicate the elements
sourceTablets.addAll(index.getTablets());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
}
targetTable.dropTempPartition(partition.getName(), true);
} else {
Expand Down Expand Up @@ -470,8 +473,10 @@ private void doCommit(boolean isReplay) {
Set<Tablet> sourceTablets = Sets.newHashSet();
sourcePartitionNames.forEach(name -> {
Partition partition = targetTable.getPartition(name);
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
sourceTablets.addAll(index.getTablets());
}
}
});
long sumSourceRows = job.getSourcePartitionIds().stream()
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/load/PartitionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
Expand Down Expand Up @@ -149,9 +150,12 @@ public static void createAndAddTempPartitionsForTable(Database db, OlapTable tar
public static void clearTabletsFromInvertedIndex(List<Partition> partitions) {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
for (Partition partition : partitions) {
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : materializedIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex materializedIndex : subPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : materializedIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Tablet;
import com.starrocks.lake.LakeTablet;
Expand Down Expand Up @@ -65,8 +65,8 @@ public MetaScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable olapTable,
}

public void computeRangeLocations() {
Collection<Partition> partitions = olapTable.getPartitions();
for (Partition partition : partitions) {
Collection<PhysicalPartition> partitions = olapTable.getPhysicalPartitions();
for (PhysicalPartition partition : partitions) {
MaterializedIndex index = partition.getBaseIndex();
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
List<Tablet> tablets = index.getTablets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4971,10 +4971,12 @@ public void onEraseDatabase(long dbId) {
public void onErasePartition(Partition partition) {
// remove tablet in inverted index
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : subPartition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
invertedIndex.deleteTablet(tabletId);
}
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/sql/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.sql.common.ErrorType;
import com.starrocks.sql.common.StarRocksPlannerException;
import com.starrocks.sql.optimizer.ExpressionContext;
Expand Down Expand Up @@ -204,8 +205,10 @@ public OperatorStr visitPhysicalOlapScan(OptExpression optExpression, OperatorPr
int totalTabletsNum = 0;
for (Long partitionId : scan.getSelectedPartitionId()) {
final Partition partition = ((OlapTable) scan.getTable()).getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(scan.getSelectedIndexId());
totalTabletsNum += selectedTable.getTablets().size();
for (PhysicalPartition subPartition : partition.getSubPartitions()) {
final MaterializedIndex selectedTable = subPartition.getIndex(scan.getSelectedIndexId());
totalTabletsNum += selectedTable.getTablets().size();
}
}
String partitionAndBucketInfo = "partitionRatio: " +
scan.getSelectedPartitionId().size() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void testBalance(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down Expand Up @@ -312,7 +313,8 @@ public void testBalanceWithSameHost(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down Expand Up @@ -494,11 +496,13 @@ public void testBalanceBackendTablet(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, partitionId1);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, partitionId1);
result = partition1;
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, partitionId2);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, partitionId2);
result = partition2;
minTimes = 0;

Expand Down Expand Up @@ -685,7 +689,8 @@ public void testBalanceParallel(@Mocked GlobalStateMgr globalStateMgr) {
result = Lists.newArrayList(table);
minTimes = 0;

GlobalStateMgr.getCurrentState().getLocalMetastore().getPartitionIncludeRecycleBin((OlapTable) any, anyLong);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.getPhysicalPartitionIncludeRecycleBin((OlapTable) any, anyLong);
result = partition;
minTimes = 0;

Expand Down

0 comments on commit 6d81f75

Please sign in to comment.