Skip to content

Commit

Permalink
[Enhancement] Support show proc replications (backport #50483) (#50569)
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
Co-authored-by: xiangguangyxg <[email protected]>
  • Loading branch information
mergify[bot] and xiangguangyxg authored Sep 3, 2024
1 parent 5d0941e commit d976483
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private ProcService() {
root.register("compactions", new CompactionsProcNode());
root.register("warehouses", new WarehouseProcDir(GlobalStateMgr.getCurrentWarehouseMgr()));
root.register("meta_recovery", new MetaRecoveryProdDir());
root.register("replications", new ReplicationsProcNode());
}

// Get the corresponding PROC Node by the specified path
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.common.proc;

import com.google.common.base.Strings;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.replication.ReplicationJob;
import com.starrocks.replication.ReplicationMgr;
import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class ReplicationsProcNode implements ProcNodeInterface {
private static final List<String> TITLES = Collections.unmodifiableList(Arrays.asList(
"JobID", "DatabaseID", "TableID", "TxnID", "CreatedTime", "FinishedTime", "State", "Progress", "Error"));

public ReplicationsProcNode() {
}

@Override
public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLES);
ReplicationMgr replicationMgr = GlobalStateMgr.getCurrentState().getReplicationMgr();
List<ReplicationJob> runningJobs = new ArrayList<>(replicationMgr.getRunningJobs());
List<ReplicationJob> abortedJobs = new ArrayList<>(replicationMgr.getAbortedJobs());
List<ReplicationJob> committedJobs = new ArrayList<>(replicationMgr.getCommittedJobs());
runningJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
abortedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
committedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
List<ReplicationJob> replicationJobs = new ArrayList<>();
replicationJobs.addAll(runningJobs);
replicationJobs.addAll(abortedJobs);
replicationJobs.addAll(committedJobs);
for (ReplicationJob job : replicationJobs) {
List<String> row = new ArrayList<>();
row.add(job.getJobId());
row.add(String.valueOf(job.getDatabaseId()));
row.add(String.valueOf(job.getTableId()));
row.add(String.valueOf(job.getTransactionId()));
row.add(TimeUtils.longToTimeString(job.getCreatedTimeMs()));
row.add(TimeUtils.longToTimeString(job.getFinishedTimeMs()));
row.add(job.getState().toString());
row.add(job.getRunningTaskNum() + "/" + job.getTotalTaskNum());
row.add(Strings.nullToEmpty(job.getErrorMessage()));
result.addRow(row);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,11 @@ public void readFields(DataInput in) throws IOException {
data = ReplicationJobLog.read(in);
isRead = true;
break;
case OperationType.OP_DELETE_REPLICATION_JOB: {
data = ReplicationJobLog.read(in);
isRead = true;
break;
}
case OperationType.OP_RECOVER_PARTITION_VERSION:
data = GsonUtils.GSON.fromJson(Text.readString(in), PartitionVersionRecoveryInfo.class);
isRead = true;
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,11 @@ public static void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity jour
globalStateMgr.getReplicationMgr().replayReplicationJob(replicationJobLog.getReplicationJob());
break;
}
case OperationType.OP_DELETE_REPLICATION_JOB: {
ReplicationJobLog replicationJobLog = (ReplicationJobLog) journal.getData();
globalStateMgr.getReplicationMgr().replayDeleteReplicationJob(replicationJobLog.getReplicationJob());
break;
}
case OperationType.OP_RECOVER_PARTITION_VERSION: {
PartitionVersionRecoveryInfo info = (PartitionVersionRecoveryInfo) journal.getData();
GlobalStateMgr.getCurrentState().getMetaRecoveryDaemon().recoverPartitionVersion(info);
Expand Down Expand Up @@ -2119,6 +2124,11 @@ public void logReplicationJob(ReplicationJob replicationJob) {
logEdit(OperationType.OP_REPLICATION_JOB, replicationJobLog);
}

public void logDeleteReplicationJob(ReplicationJob replicationJob) {
ReplicationJobLog replicationJobLog = new ReplicationJobLog(replicationJob);
logEdit(OperationType.OP_DELETE_REPLICATION_JOB, replicationJobLog);
}

public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) {
logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ public class OperationType {
// Replication job
@IgnorableOnReplayFailed
public static final short OP_REPLICATION_JOB = 13500;
@IgnorableOnReplayFailed
public static final short OP_DELETE_REPLICATION_JOB = 13501;

public static final short OP_DISABLE_TABLE_RECOVERY = 13510;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ public TSnapshotInfo getSnapshotInfo() {
@SerializedName(value = "jobId")
private final String jobId;

@SerializedName(value = "createdTimeMs")
private final long createdTimeMs;

@SerializedName(value = "finishedTimeMs")
private volatile long finishedTimeMs;

@SerializedName(value = "srcToken")
private final String srcToken;

Expand Down Expand Up @@ -381,6 +387,14 @@ public String getJobId() {
return jobId;
}

public long getCreatedTimeMs() {
return createdTimeMs;
}

public long getFinishedTimeMs() {
return finishedTimeMs;
}

public long getDatabaseId() {
return databaseId;
}
Expand All @@ -389,6 +403,10 @@ public long getTableId() {
return tableId;
}

public long getTransactionId() {
return transactionId;
}

public long getReplicationDataSize() {
return replicationDataSize;
}
Expand All @@ -403,11 +421,40 @@ public ReplicationJobState getState() {

private void setState(ReplicationJobState state) {
this.state = state;
if (state.equals(ReplicationJobState.COMMITTED) || state.equals(ReplicationJobState.ABORTED)) {
finishedTimeMs = System.currentTimeMillis();
}
GlobalStateMgr.getServingState().getEditLog().logReplicationJob(this);
LOG.info("Replication job state: {}, database id: {}, table id: {}, transaction id: {}",
state, databaseId, tableId, transactionId);
}

public boolean isExpired() {
return finishedTimeMs > 0 &&
(System.currentTimeMillis() - finishedTimeMs) > (Config.history_job_keep_max_second * 1000);
}

public long getRunningTaskNum() {
return runningTasks.size();
}

public long getTotalTaskNum() {
return taskNum;
}

public String getErrorMessage() {
if (!state.equals(ReplicationJobState.ABORTED)) {
return null;
}

TransactionState transactionState = GlobalStateMgr.getServingState().getGlobalTransactionMgr()
.getTransactionState(databaseId, transactionId);
if (transactionState == null) {
return "Transaction not found";
}
return transactionState.getReason();
}

public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundException {
Preconditions.checkState(request.src_table_type == TTableType.OLAP_TABLE);

Expand All @@ -416,6 +463,8 @@ public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundExcep
} else {
this.jobId = request.job_id;
}
this.createdTimeMs = System.currentTimeMillis();
this.finishedTimeMs = 0;
this.srcToken = request.src_token;
this.databaseId = request.database_id;
TableInfo tableInfo = initTableInfo(request);
Expand All @@ -440,6 +489,8 @@ public ReplicationJob(String jobId, String srcToken, long databaseId, OlapTable
} else {
this.jobId = jobId;
}
this.createdTimeMs = System.currentTimeMillis();
this.finishedTimeMs = 0;
this.srcToken = srcToken;
this.databaseId = databaseId;
TableInfo tableInfo = initTableInfo(table, srcTable, srcSystemInfoService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -57,23 +59,8 @@ public ReplicationMgr() {

@Override
protected void runAfterCatalogReady() {
List<ReplicationJob> toRemovedJobs = Lists.newArrayList();
for (ReplicationJob job : runningJobs.values()) {
job.run();

ReplicationJobState state = job.getState();
if (state.equals(ReplicationJobState.COMMITTED)) {
toRemovedJobs.add(job);
committedJobs.put(job.getTableId(), job);
} else if (state.equals(ReplicationJobState.ABORTED)) {
toRemovedJobs.add(job);
abortedJobs.put(job.getTableId(), job);
}
}

for (ReplicationJob job : toRemovedJobs) {
runningJobs.remove(job.getTableId(), job);
}
runRunningJobs();
clearExpiredJobs();
}

public void addReplicationJob(TTableReplicationRequest request) throws UserException {
Expand Down Expand Up @@ -119,18 +106,16 @@ public void addReplicationJob(ReplicationJob job) throws AlreadyExistsException
job.getDatabaseId(), job.getTableId(), job.getReplicationDataSize(), replicatingDataSizeMB);
}

public boolean hasRunningJobs() {
return !runningJobs.isEmpty();
public Collection<ReplicationJob> getRunningJobs() {
return runningJobs.values();
}

public boolean hasFailedJobs() {
return !abortedJobs.isEmpty();
public Collection<ReplicationJob> getCommittedJobs() {
return committedJobs.values();
}

public void clearFinishedJobs() {
committedJobs.clear();
abortedJobs.clear();
GlobalStateMgr.getServingState().getEditLog().logReplicationJob(null);
public Collection<ReplicationJob> getAbortedJobs() {
return abortedJobs.values();
}

public void cancelRunningJobs() {
Expand Down Expand Up @@ -170,12 +155,6 @@ public void finishReplicateSnapshotTask(ReplicateSnapshotTask task, TFinishTaskR
}

public void replayReplicationJob(ReplicationJob replicationJob) {
if (replicationJob == null) {
committedJobs.clear();
abortedJobs.clear();
return;
}

if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) {
committedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
Expand All @@ -187,6 +166,16 @@ public void replayReplicationJob(ReplicationJob replicationJob) {
}
}

public void replayDeleteReplicationJob(ReplicationJob replicationJob) {
if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) {
committedJobs.remove(replicationJob.getTableId());
} else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) {
abortedJobs.remove(replicationJob.getTableId());
} else {
runningJobs.remove(replicationJob.getTableId());
}
}

private long getReplicatingReplicaCount() {
long replicatingReplicaCount = 0;
for (ReplicationJob job : runningJobs.values()) {
Expand All @@ -203,6 +192,48 @@ private long getReplicatingDataSize() {
return replicatingDataSize;
}

private void runRunningJobs() {
List<ReplicationJob> toRemovedJobs = Lists.newArrayList();
for (ReplicationJob job : runningJobs.values()) {
job.run();

ReplicationJobState state = job.getState();
if (state.equals(ReplicationJobState.COMMITTED)) {
toRemovedJobs.add(job);
committedJobs.put(job.getTableId(), job);
} else if (state.equals(ReplicationJobState.ABORTED)) {
toRemovedJobs.add(job);
abortedJobs.put(job.getTableId(), job);
}
}

for (ReplicationJob job : toRemovedJobs) {
runningJobs.remove(job.getTableId(), job);
}
}

private void clearExpiredJobs() {
for (Iterator<Map.Entry<Long, ReplicationJob>> it = committedJobs.entrySet().iterator(); it.hasNext();) {
ReplicationJob job = it.next().getValue();
if (!job.isExpired()) {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job);
it.remove();
}

for (Iterator<Map.Entry<Long, ReplicationJob>> it = abortedJobs.entrySet().iterator(); it.hasNext();) {
ReplicationJob job = it.next().getValue();
if (!job.isExpired()) {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job);
it.remove();
}
}

public void save(DataOutputStream dos) throws IOException, SRMetaBlockException {
SRMetaBlockWriter writer = new SRMetaBlockWriter(dos, SRMetaBlockID.REPLICATION_MGR, 1);
writer.writeJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public void testRecoverableOperations() {
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DROP_STORAGE_VOLUME));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_ALTER_CATALOG));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB));
}

@Test
Expand Down
Loading

0 comments on commit d976483

Please sign in to comment.