Skip to content

Commit

Permalink
[Enhancement] Support show proc replications
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Sep 2, 2024
1 parent 9126be9 commit 88115af
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private ProcService() {
root.register("catalog", GlobalStateMgr.getCurrentState().getCatalogMgr().getProcNode());
root.register("compactions", new CompactionsProcNode());
root.register("meta_recovery", new MetaRecoveryProdDir());
root.register("replications", new ReplicationsProcNode());
}

// Get the corresponding PROC Node by the specified path
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.common.proc;

import com.google.common.base.Strings;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.replication.ReplicationJob;
import com.starrocks.replication.ReplicationMgr;
import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class ReplicationsProcNode implements ProcNodeInterface {
private static final List<String> TITLES = Collections.unmodifiableList(Arrays.asList(
"JobID", "DatabaseID", "TableID", "TxnID", "CreatedTime", "FinishedTime", "State", "Progress", "Error"));

public ReplicationsProcNode() {
}

@Override
public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLES);
ReplicationMgr replicationMgr = GlobalStateMgr.getCurrentState().getReplicationMgr();
List<ReplicationJob> runningJobs = new ArrayList<>(replicationMgr.getRunningJobs());
List<ReplicationJob> abortedJobs = new ArrayList<>(replicationMgr.getAbortedJobs());
List<ReplicationJob> committedJobs = new ArrayList<>(replicationMgr.getCommittedJobs());
runningJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
abortedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
committedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs()));
List<ReplicationJob> replicationJobs = new ArrayList<>();
replicationJobs.addAll(runningJobs);
replicationJobs.addAll(abortedJobs);
replicationJobs.addAll(committedJobs);
for (ReplicationJob job : replicationJobs) {
List<String> row = new ArrayList<>();
row.add(job.getJobId());
row.add(String.valueOf(job.getDatabaseId()));
row.add(String.valueOf(job.getTableId()));
row.add(String.valueOf(job.getTransactionId()));
row.add(TimeUtils.longToTimeString(job.getCreatedTimeMs()));
row.add(TimeUtils.longToTimeString(job.getFinishedTimeMs()));
row.add(job.getState().toString());
row.add(job.getRunningTaskNum() + "/" + job.getTotalTaskNum());
row.add(Strings.nullToEmpty(job.getErrorMessage()));
result.addRow(row);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ public TSnapshotInfo getSnapshotInfo() {
@SerializedName(value = "jobId")
private final String jobId;

@SerializedName(value = "createdTimeMs")
private final long createdTimeMs;

@SerializedName(value = "finishedTimeMs")
private volatile long finishedTimeMs;

@SerializedName(value = "srcToken")
private final String srcToken;

Expand Down Expand Up @@ -392,6 +398,14 @@ public String getJobId() {
return jobId;
}

public long getCreatedTimeMs() {
return createdTimeMs;
}

public long getFinishedTimeMs() {
return finishedTimeMs;
}

public long getDatabaseId() {
return databaseId;
}
Expand All @@ -400,6 +414,10 @@ public long getTableId() {
return tableId;
}

public long getTransactionId() {
return transactionId;
}

public long getReplicationDataSize() {
return replicationDataSize;
}
Expand All @@ -414,11 +432,40 @@ public ReplicationJobState getState() {

private void setState(ReplicationJobState state) {
this.state = state;
if (state.equals(ReplicationJobState.COMMITTED) || state.equals(ReplicationJobState.ABORTED)) {
finishedTimeMs = System.currentTimeMillis();
}
GlobalStateMgr.getServingState().getEditLog().logReplicationJob(this);
LOG.info("Replication job state: {}, database id: {}, table id: {}, transaction id: {}",
state, databaseId, tableId, transactionId);
}

public boolean isExpired() {
return finishedTimeMs > 0 &&
(System.currentTimeMillis() - finishedTimeMs) > (Config.history_job_keep_max_second * 1000);
}

public long getRunningTaskNum() {
return runningTasks.size();
}

public long getTotalTaskNum() {
return taskNum;
}

public String getErrorMessage() {
if (!state.equals(ReplicationJobState.ABORTED)) {
return null;
}

TransactionState transactionState = GlobalStateMgr.getServingState().getGlobalTransactionMgr()
.getTransactionState(databaseId, transactionId);
if (transactionState == null) {
return "Transaction not found";
}
return transactionState.getReason();
}

public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundException {
Preconditions.checkState(request.src_table_type == TTableType.OLAP_TABLE);

Expand All @@ -427,6 +474,8 @@ public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundExcep
} else {
this.jobId = request.job_id;
}
this.createdTimeMs = System.currentTimeMillis();
this.finishedTimeMs = 0;
this.srcToken = request.src_token;
this.databaseId = request.database_id;
TableInfo tableInfo = initTableInfo(request);
Expand All @@ -451,6 +500,8 @@ public ReplicationJob(String jobId, String srcToken, long databaseId, OlapTable
} else {
this.jobId = jobId;
}
this.createdTimeMs = System.currentTimeMillis();
this.finishedTimeMs = 0;
this.srcToken = srcToken;
this.databaseId = databaseId;
TableInfo tableInfo = initTableInfo(table, srcTable, srcSystemInfoService);
Expand Down Expand Up @@ -831,7 +882,8 @@ private boolean isTransactionAborted() {

if (txnState.getTransactionStatus() == TransactionStatus.PREPARE) {
Database db = GlobalStateMgr.getServingState().getLocalMetastore().getDb(databaseId);
if (db == null || GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId) == null) {
if (db == null
|| GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getId(), tableId) == null) {
abortTransaction("Table is deleted");
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -57,23 +59,8 @@ public ReplicationMgr() {

@Override
protected void runAfterCatalogReady() {
List<ReplicationJob> toRemovedJobs = Lists.newArrayList();
for (ReplicationJob job : runningJobs.values()) {
job.run();

ReplicationJobState state = job.getState();
if (state.equals(ReplicationJobState.COMMITTED)) {
toRemovedJobs.add(job);
committedJobs.put(job.getTableId(), job);
} else if (state.equals(ReplicationJobState.ABORTED)) {
toRemovedJobs.add(job);
abortedJobs.put(job.getTableId(), job);
}
}

for (ReplicationJob job : toRemovedJobs) {
runningJobs.remove(job.getTableId(), job);
}
runRunningJobs();
clearExpiredJobs();
}

public void addReplicationJob(TTableReplicationRequest request) throws UserException {
Expand Down Expand Up @@ -119,12 +106,16 @@ public void addReplicationJob(ReplicationJob job) throws AlreadyExistsException
job.getDatabaseId(), job.getTableId(), job.getReplicationDataSize(), replicatingDataSizeMB);
}

public boolean hasRunningJobs() {
return !runningJobs.isEmpty();
public Collection<ReplicationJob> getRunningJobs() {
return runningJobs.values();
}

public Collection<ReplicationJob> getCommittedJobs() {
return committedJobs.values();
}

public boolean hasFailedJobs() {
return !abortedJobs.isEmpty();
public Collection<ReplicationJob> getAbortedJobs() {
return abortedJobs.values();
}

public void clearFinishedJobs() {
Expand Down Expand Up @@ -177,11 +168,19 @@ public void replayReplicationJob(ReplicationJob replicationJob) {
}

if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) {
committedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
if (replicationJob.isExpired()) {
committedJobs.remove(replicationJob.getTableId());
} else {
committedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
}
} else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) {
abortedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
if (replicationJob.isExpired()) {
abortedJobs.remove(replicationJob.getTableId());
} else {
abortedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
}
} else {
runningJobs.put(replicationJob.getTableId(), replicationJob);
}
Expand All @@ -203,6 +202,48 @@ private long getReplicatingDataSize() {
return replicatingDataSize;
}

private void runRunningJobs() {
List<ReplicationJob> toRemovedJobs = Lists.newArrayList();
for (ReplicationJob job : runningJobs.values()) {
job.run();

ReplicationJobState state = job.getState();
if (state.equals(ReplicationJobState.COMMITTED)) {
toRemovedJobs.add(job);
committedJobs.put(job.getTableId(), job);
} else if (state.equals(ReplicationJobState.ABORTED)) {
toRemovedJobs.add(job);
abortedJobs.put(job.getTableId(), job);
}
}

for (ReplicationJob job : toRemovedJobs) {
runningJobs.remove(job.getTableId(), job);
}
}

private void clearExpiredJobs() {
for (Iterator<Map.Entry<Long, ReplicationJob>> it = committedJobs.entrySet().iterator(); it.hasNext();) {
ReplicationJob job = it.next().getValue();
if (!job.isExpired()) {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job);
it.remove();
}

for (Iterator<Map.Entry<Long, ReplicationJob>> it = abortedJobs.entrySet().iterator(); it.hasNext();) {
ReplicationJob job = it.next().getValue();
if (!job.isExpired()) {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job);
it.remove();
}
}

public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.REPLICATION_MGR, 1);
writer.writeJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.Config;
import com.starrocks.common.io.DeepCopy;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.proc.ReplicationsProcNode;
import com.starrocks.leader.LeaderImpl;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
Expand Down Expand Up @@ -165,6 +167,21 @@ public void testNormal() throws Exception {
Assert.assertEquals(partition.getCommittedDataVersion(), srcPartition.getDataVersion());

replicationMgr.replayReplicationJob(job);

Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty());
Assert.assertFalse(replicationMgr.getCommittedJobs().isEmpty());

int old = Config.history_job_keep_max_second;
Config.history_job_keep_max_second = -1;
Assert.assertTrue(job.isExpired());

replicationMgr.runAfterCatalogReady();
Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty());

replicationMgr.replayReplicationJob(job);
Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty());

Config.history_job_keep_max_second = old;
}

@Test
Expand All @@ -177,8 +194,8 @@ public void testInitializingCancel() {
replicationMgr.runAfterCatalogReady();
Assert.assertEquals(ReplicationJobState.ABORTED, job.getState());

Assert.assertFalse(replicationMgr.hasRunningJobs());
Assert.assertTrue(replicationMgr.hasFailedJobs());
Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty());
Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty());

replicationMgr.clearFinishedJobs();
replicationMgr.replayReplicationJob(null);
Expand All @@ -196,6 +213,21 @@ public void testSnapshotingCancel() {

replicationMgr.runAfterCatalogReady();
Assert.assertEquals(ReplicationJobState.ABORTED, job.getState());

Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty());
Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty());

int old = Config.history_job_keep_max_second;
Config.history_job_keep_max_second = -1;
Assert.assertTrue(job.isExpired());

replicationMgr.runAfterCatalogReady();
Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty());

replicationMgr.replayReplicationJob(job);
Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty());

Config.history_job_keep_max_second = old;
}

@Test
Expand Down Expand Up @@ -372,4 +404,14 @@ private static TSnapshotInfo newTSnapshotInfo(TBackend backend, String snapshotP
tSnapshotInfo.setIncremental_snapshot(incrementalSnapshot);
return tSnapshotInfo;
}

@Test
public void testReplicationsProcNode() {
ReplicationsProcNode procNode = new ReplicationsProcNode();
try {
procNode.fetchResult();
} catch (Exception e) {
Assert.assertNull(e);
}
}
}

0 comments on commit 88115af

Please sign in to comment.