Skip to content

Commit

Permalink
Add Meta refactor development branch
Browse files Browse the repository at this point in the history
Signed-off-by: HangyuanLiu <[email protected]>
  • Loading branch information
HangyuanLiu committed Sep 14, 2024
1 parent 4bd744c commit 9ebc2bf
Show file tree
Hide file tree
Showing 267 changed files with 10,479 additions and 9,900 deletions.
425 changes: 361 additions & 64 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java

Large diffs are not rendered by default.

468 changes: 14 additions & 454 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Void visitTableRenameClause(TableRenameClause clause, ConnectContext cont
final RenameMaterializedViewLog renameMaterializedViewLog =
new RenameMaterializedViewLog(table.getId(), db.getId(), newMvName);
updateTaskDefinition((MaterializedView) table);
GlobalStateMgr.getCurrentState().getEditLog().logMvRename(renameMaterializedViewLog);
GlobalStateMgr.getCurrentState().getLocalMetastore().renameMaterializedView(renameMaterializedViewLog);
LOG.info("rename materialized view[{}] to {}, id: {}", oldMvName, newMvName, table.getId());
return null;
}
Expand Down Expand Up @@ -307,7 +307,7 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
if (isChanged) {
ModifyTablePropertyOperationLog log = new ModifyTablePropertyOperationLog(materializedView.getDbId(),
materializedView.getId(), propClone);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMaterializedViewProperties(log);
GlobalStateMgr.getCurrentState().getLocalMetastore().alterMaterializedViewProperties(log);
}
LOG.info("alter materialized view properties {}, id: {}", propClone, materializedView.getId());
return null;
Expand Down Expand Up @@ -378,7 +378,7 @@ public Void visitRefreshSchemeClause(RefreshSchemeClause refreshSchemeDesc, Conn
}

final ChangeMaterializedViewRefreshSchemeLog log = new ChangeMaterializedViewRefreshSchemeLog(materializedView);
GlobalStateMgr.getCurrentState().getEditLog().logMvChangeRefreshScheme(log);
GlobalStateMgr.getCurrentState().getLocalMetastore().changeMaterializedRefreshScheme(log);
} finally {
locker.unLockDatabase(db.getId(), LockType.WRITE);
}
Expand All @@ -403,11 +403,11 @@ public Void visitAlterMaterializedViewStatusClause(AlterMaterializedViewStatusCl
return null;
}

GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(materializedView, status, false);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.alterMaterializedViewStatus(materializedView, status, false);
// for manual refresh type, do not refresh
if (materializedView.getRefreshScheme().getType() != MaterializedView.RefreshType.MANUAL) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
GlobalStateMgr.getCurrentState().getStarRocksMetadata()
.refreshMaterializedView(dbName, materializedView.getName(), true, null,
Constants.TaskRunPriority.NORMAL.value(), true, false);
}
Expand All @@ -418,14 +418,14 @@ public Void visitAlterMaterializedViewStatusClause(AlterMaterializedViewStatusCl
LOG.warn("Setting the materialized view {}({}) to inactive because " +
"user use alter materialized view set status to inactive",
materializedView.getName(), materializedView.getId());
GlobalStateMgr.getCurrentState().getAlterJobMgr().
alterMaterializedViewStatus(materializedView, status, false);
GlobalStateMgr.getCurrentState().getLocalMetastore()
.alterMaterializedViewStatus(materializedView, status, false);
} else {
throw new AlterJobException("Unsupported modification materialized view status:" + status);
}
AlterMaterializedViewStatusLog log = new AlterMaterializedViewStatusLog(materializedView.getDbId(),
materializedView.getId(), status);
GlobalStateMgr.getCurrentState().getEditLog().logAlterMvStatus(log);
GlobalStateMgr.getCurrentState().getLocalMetastore().alterMvStatus(log);
return null;
} catch (DdlException | MetaNotFoundException e) {
throw new AlterJobException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.starrocks.lake.compaction.PartitionIdentifier;
import com.starrocks.qe.ShowResultSet;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.CompactionClause;
Expand All @@ -47,7 +48,7 @@
import java.util.ArrayList;
import java.util.List;

public class CompactionHandler {
public class CompactionHandler {
private static final Logger LOG = LogManager.getLogger(CompactionHandler.class);

// add synchronized to avoid process 2 or more stmts at same time
Expand Down Expand Up @@ -82,11 +83,16 @@ public static synchronized ShowResultSet process(List<AlterClause> alterClauses,
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(olapTable.getId()), LockType.READ);
try {
List<Partition> allPartitions = findAllPartitions(olapTable, compactionClause);

LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
for (Partition partition : allPartitions) {
for (PhysicalPartition physicalPartition : partition.getSubPartitions()) {
for (MaterializedIndex index : physicalPartition.getMaterializedIndices(
MaterializedIndex.IndexExtState.VISIBLE)) {
for (Tablet tablet : index.getTablets()) {
List<PhysicalPartition> physicalPartitionList = localMetastore.getAllPhysicalPartition(partition);
for (PhysicalPartition physicalPartition : physicalPartitionList) {
List<MaterializedIndex> materializedIndices = localMetastore
.getMaterializedIndices(physicalPartition, MaterializedIndex.IndexExtState.VISIBLE);
for (MaterializedIndex materializedIndex : materializedIndices) {
List<Tablet> tabletList = localMetastore.getAllTablets(materializedIndex);
for (Tablet tablet : tabletList) {
for (Long backendId : ((LocalTablet) tablet).getBackendIds()) {
backendToTablets.put(backendId, tablet.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public AlterJobV2 build() throws UserException {
long partitionId = partition.getParentId();
long physicalPartitionId = partition.getId();
long shardGroupId = partition.getShardGroupId();
List<Tablet> originTablets = partition.getIndex(originIndexId).getTablets();

MaterializedIndex materializedIndex = partition.getIndex(originIndexId);
List<Tablet> originTablets = GlobalStateMgr.getCurrentState().getLocalMetastore()
.getAllTablets(materializedIndex);

// TODO: It is not good enough to create shards into the same group id, schema change PR needs to
// revise the code again.
List<Long> originTabletIds = originTablets.stream().map(Tablet::getId).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ boolean publishVersion() {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> dirtyIndexMap = physicalPartitionIndexMap.row(partitionId);
for (MaterializedIndex index : dirtyIndexMap.values()) {
Utils.publishVersion(index.getTablets(), txnInfo, commitVersion - 1, commitVersion,
List<Tablet> tabletList = GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTablets(index);
Utils.publishVersion(tabletList, txnInfo, commitVersion - 1, commitVersion,
warehouseId);
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ public void updatePhysicalPartitionTabletMeta(Database db, OlapTable table, Part
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
for (MaterializedIndex index : indexList) {
updateIndexTabletMeta(db, table, partition, index);
updateIndexTabletMeta(db, table, partition.getDefaultPhysicalPartition(), index);
}
}

Expand All @@ -319,7 +320,7 @@ public void updateIndexTabletMeta(Database db, OlapTable table, PhysicalPartitio
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
try {
tablets = new ArrayList<>(index.getTablets());
tablets = GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTablets(index);
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -338,7 +337,9 @@ protected void runPendingJob() throws AlterCancelException {
if (enableTabletCreationOptimization) {
numTablets = physicalPartitionIndexMap.size();
} else {
numTablets = physicalPartitionIndexMap.values().stream().map(MaterializedIndex::getTablets)
numTablets = physicalPartitionIndexMap.values().stream()
.map(materializedIndex -> GlobalStateMgr.getCurrentState().getLocalMetastore()
.getAllTablets(materializedIndex))
.mapToLong(List::size).sum();
}
countDownLatch = new MarkedCountDownLatch<>((int) numTablets);
Expand Down Expand Up @@ -373,7 +374,7 @@ protected void runPendingJob() throws AlterCancelException {
.build().toTabletSchema();

boolean createSchemaFile = true;
for (Tablet shadowTablet : shadowIdx.getTablets()) {
for (Tablet shadowTablet : GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTablets(shadowIdx)) {
long shadowTabletId = shadowTablet.getId();
ComputeNode computeNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeAssignedToTablet(warehouseId, (LakeTablet) shadowTablet);
Expand Down Expand Up @@ -418,7 +419,7 @@ protected void runPendingJob() throws AlterCancelException {
}

sendAgentTaskAndWait(batchTask, countDownLatch, Config.tablet_create_timeout_second * numTablets,
waitingCreatingReplica, isCancelling);
waitingCreatingReplica, isCancelling);

// Add shadow indexes to table.
try (WriteLockedDatabase db = getWriteLockedDatabase(dbId)) {
Expand Down Expand Up @@ -483,7 +484,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
for (Tablet shadowTablet : shadowIdx.getTablets()) {
for (Tablet shadowTablet : GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTablets(shadowIdx)) {
ComputeNode computeNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeAssignedToTablet(warehouseId, (LakeTablet) shadowTablet);
if (computeNode == null) {
Expand Down Expand Up @@ -610,7 +611,7 @@ protected void runFinishedRewritingJob() throws AlterCancelException {

// Delete tablet and shards
for (MaterializedIndex droppedIndex : droppedIndexes) {
List<Long> shards = droppedIndex.getTablets().stream().map(Tablet::getId).collect(Collectors.toList());
List<Long> shards = GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTabletIDs(droppedIndex);
// TODO: what if unusedShards deletion is partially successful?
StarMgrMetaSyncer.dropTabletAndDeleteShard(shards, GlobalStateMgr.getCurrentState().getStarOSAgent());
}
Expand Down Expand Up @@ -668,7 +669,9 @@ boolean publishVersion() {
long commitVersion = commitVersionMap.get(partitionId);
Map<Long, MaterializedIndex> shadowIndexMap = physicalPartitionIndexMap.row(partitionId);
for (MaterializedIndex shadowIndex : shadowIndexMap.values()) {
Utils.publishVersion(shadowIndex.getTablets(), txnInfo, 1, commitVersion, warehouseId);
Utils.publishVersion(
GlobalStateMgr.getCurrentState().getLocalMetastore().getAllTablets(shadowIndex),
txnInfo, 1, commitVersion, warehouseId);
}
}
return true;
Expand Down Expand Up @@ -700,7 +703,7 @@ private void inactiveRelatedMv(Set<String> modifiedColumns, @NotNull OlapTable t
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
for (MvId mvId : tbl.getRelatedMaterializedViews()) {
MaterializedView mv = (MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getId(), mvId.getId());
.getTable(db.getId(), mvId.getId());
if (mv == null) {
LOG.warn("Ignore materialized view {} does not exists", mvId);
continue;
Expand Down
Loading

0 comments on commit 9ebc2bf

Please sign in to comment.