From d97648366aba82f17e32170d6dcf03fe33352a2a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:10:15 +0800 Subject: [PATCH] [Enhancement] Support show proc replications (backport #50483) (#50569) Signed-off-by: xiangguangyxg Co-authored-by: xiangguangyxg <110401425+xiangguangyxg@users.noreply.github.com> --- .../starrocks/common/proc/ProcService.java | 1 + .../common/proc/ReplicationsProcNode.java | 66 +++++++++++++ .../com/starrocks/journal/JournalEntity.java | 5 + .../java/com/starrocks/persist/EditLog.java | 10 ++ .../com/starrocks/persist/OperationType.java | 2 + .../starrocks/replication/ReplicationJob.java | 51 ++++++++++ .../starrocks/replication/ReplicationMgr.java | 93 ++++++++++++------- .../starrocks/persist/OperationTypeTest.java | 1 + .../replication/ReplicationMgrTest.java | 49 +++++++++- 9 files changed, 242 insertions(+), 36 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/common/proc/ReplicationsProcNode.java diff --git a/fe/fe-core/src/main/java/com/starrocks/common/proc/ProcService.java b/fe/fe-core/src/main/java/com/starrocks/common/proc/ProcService.java index 8a8f271b9803d..19df01a2b2edb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/proc/ProcService.java @@ -73,6 +73,7 @@ private ProcService() { root.register("compactions", new CompactionsProcNode()); root.register("warehouses", new WarehouseProcDir(GlobalStateMgr.getCurrentWarehouseMgr())); root.register("meta_recovery", new MetaRecoveryProdDir()); + root.register("replications", new ReplicationsProcNode()); } // Get the corresponding PROC Node by the specified path diff --git a/fe/fe-core/src/main/java/com/starrocks/common/proc/ReplicationsProcNode.java b/fe/fe-core/src/main/java/com/starrocks/common/proc/ReplicationsProcNode.java new file mode 100644 index 0000000000000..cb98f5202b6d9 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/common/proc/ReplicationsProcNode.java @@ -0,0 +1,66 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.common.proc; + +import com.google.common.base.Strings; +import com.starrocks.common.AnalysisException; +import com.starrocks.common.util.TimeUtils; +import com.starrocks.replication.ReplicationJob; +import com.starrocks.replication.ReplicationMgr; +import com.starrocks.server.GlobalStateMgr; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class ReplicationsProcNode implements ProcNodeInterface { + private static final List TITLES = Collections.unmodifiableList(Arrays.asList( + "JobID", "DatabaseID", "TableID", "TxnID", "CreatedTime", "FinishedTime", "State", "Progress", "Error")); + + public ReplicationsProcNode() { + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLES); + ReplicationMgr replicationMgr = GlobalStateMgr.getCurrentState().getReplicationMgr(); + List runningJobs = new ArrayList<>(replicationMgr.getRunningJobs()); + List abortedJobs = new ArrayList<>(replicationMgr.getAbortedJobs()); + List committedJobs = new ArrayList<>(replicationMgr.getCommittedJobs()); + runningJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs())); + abortedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs())); + committedJobs.sort((job1, job2) -> Long.compare(job1.getCreatedTimeMs(), job2.getCreatedTimeMs())); + List replicationJobs = new ArrayList<>(); + replicationJobs.addAll(runningJobs); + replicationJobs.addAll(abortedJobs); + replicationJobs.addAll(committedJobs); + for (ReplicationJob job : replicationJobs) { + List row = new ArrayList<>(); + row.add(job.getJobId()); + row.add(String.valueOf(job.getDatabaseId())); + row.add(String.valueOf(job.getTableId())); + row.add(String.valueOf(job.getTransactionId())); + row.add(TimeUtils.longToTimeString(job.getCreatedTimeMs())); + row.add(TimeUtils.longToTimeString(job.getFinishedTimeMs())); + row.add(job.getState().toString()); + row.add(job.getRunningTaskNum() + "/" + job.getTotalTaskNum()); + row.add(Strings.nullToEmpty(job.getErrorMessage())); + result.addRow(row); + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java index aec2cee4c8a62..9eb6704eeb24a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java @@ -1059,6 +1059,11 @@ public void readFields(DataInput in) throws IOException { data = ReplicationJobLog.read(in); isRead = true; break; + case OperationType.OP_DELETE_REPLICATION_JOB: { + data = ReplicationJobLog.read(in); + isRead = true; + break; + } case OperationType.OP_RECOVER_PARTITION_VERSION: data = GsonUtils.GSON.fromJson(Text.readString(in), PartitionVersionRecoveryInfo.class); isRead = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java index 879c9a5963c60..836ecada1c9f4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java @@ -1097,6 +1097,11 @@ public static void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity jour globalStateMgr.getReplicationMgr().replayReplicationJob(replicationJobLog.getReplicationJob()); break; } + case OperationType.OP_DELETE_REPLICATION_JOB: { + ReplicationJobLog replicationJobLog = (ReplicationJobLog) journal.getData(); + globalStateMgr.getReplicationMgr().replayDeleteReplicationJob(replicationJobLog.getReplicationJob()); + break; + } case OperationType.OP_RECOVER_PARTITION_VERSION: { PartitionVersionRecoveryInfo info = (PartitionVersionRecoveryInfo) journal.getData(); GlobalStateMgr.getCurrentState().getMetaRecoveryDaemon().recoverPartitionVersion(info); @@ -2119,6 +2124,11 @@ public void logReplicationJob(ReplicationJob replicationJob) { logEdit(OperationType.OP_REPLICATION_JOB, replicationJobLog); } + public void logDeleteReplicationJob(ReplicationJob replicationJob) { + ReplicationJobLog replicationJobLog = new ReplicationJobLog(replicationJob); + logEdit(OperationType.OP_DELETE_REPLICATION_JOB, replicationJobLog); + } + public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) { logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info); } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java index c50766e50ae98..65b63639dae3c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java @@ -853,6 +853,8 @@ public class OperationType { // Replication job @IgnorableOnReplayFailed public static final short OP_REPLICATION_JOB = 13500; + @IgnorableOnReplayFailed + public static final short OP_DELETE_REPLICATION_JOB = 13501; public static final short OP_DISABLE_TABLE_RECOVERY = 13510; diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java index e7a576cc8fa3c..b36c898646793 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationJob.java @@ -343,6 +343,12 @@ public TSnapshotInfo getSnapshotInfo() { @SerializedName(value = "jobId") private final String jobId; + @SerializedName(value = "createdTimeMs") + private final long createdTimeMs; + + @SerializedName(value = "finishedTimeMs") + private volatile long finishedTimeMs; + @SerializedName(value = "srcToken") private final String srcToken; @@ -381,6 +387,14 @@ public String getJobId() { return jobId; } + public long getCreatedTimeMs() { + return createdTimeMs; + } + + public long getFinishedTimeMs() { + return finishedTimeMs; + } + public long getDatabaseId() { return databaseId; } @@ -389,6 +403,10 @@ public long getTableId() { return tableId; } + public long getTransactionId() { + return transactionId; + } + public long getReplicationDataSize() { return replicationDataSize; } @@ -403,11 +421,40 @@ public ReplicationJobState getState() { private void setState(ReplicationJobState state) { this.state = state; + if (state.equals(ReplicationJobState.COMMITTED) || state.equals(ReplicationJobState.ABORTED)) { + finishedTimeMs = System.currentTimeMillis(); + } GlobalStateMgr.getServingState().getEditLog().logReplicationJob(this); LOG.info("Replication job state: {}, database id: {}, table id: {}, transaction id: {}", state, databaseId, tableId, transactionId); } + public boolean isExpired() { + return finishedTimeMs > 0 && + (System.currentTimeMillis() - finishedTimeMs) > (Config.history_job_keep_max_second * 1000); + } + + public long getRunningTaskNum() { + return runningTasks.size(); + } + + public long getTotalTaskNum() { + return taskNum; + } + + public String getErrorMessage() { + if (!state.equals(ReplicationJobState.ABORTED)) { + return null; + } + + TransactionState transactionState = GlobalStateMgr.getServingState().getGlobalTransactionMgr() + .getTransactionState(databaseId, transactionId); + if (transactionState == null) { + return "Transaction not found"; + } + return transactionState.getReason(); + } + public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundException { Preconditions.checkState(request.src_table_type == TTableType.OLAP_TABLE); @@ -416,6 +463,8 @@ public ReplicationJob(TTableReplicationRequest request) throws MetaNotFoundExcep } else { this.jobId = request.job_id; } + this.createdTimeMs = System.currentTimeMillis(); + this.finishedTimeMs = 0; this.srcToken = request.src_token; this.databaseId = request.database_id; TableInfo tableInfo = initTableInfo(request); @@ -440,6 +489,8 @@ public ReplicationJob(String jobId, String srcToken, long databaseId, OlapTable } else { this.jobId = jobId; } + this.createdTimeMs = System.currentTimeMillis(); + this.finishedTimeMs = 0; this.srcToken = srcToken; this.databaseId = databaseId; TableInfo tableInfo = initTableInfo(table, srcTable, srcSystemInfoService); diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java index f7c80cfe79668..96bac91983342 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java @@ -36,6 +36,8 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,23 +59,8 @@ public ReplicationMgr() { @Override protected void runAfterCatalogReady() { - List toRemovedJobs = Lists.newArrayList(); - for (ReplicationJob job : runningJobs.values()) { - job.run(); - - ReplicationJobState state = job.getState(); - if (state.equals(ReplicationJobState.COMMITTED)) { - toRemovedJobs.add(job); - committedJobs.put(job.getTableId(), job); - } else if (state.equals(ReplicationJobState.ABORTED)) { - toRemovedJobs.add(job); - abortedJobs.put(job.getTableId(), job); - } - } - - for (ReplicationJob job : toRemovedJobs) { - runningJobs.remove(job.getTableId(), job); - } + runRunningJobs(); + clearExpiredJobs(); } public void addReplicationJob(TTableReplicationRequest request) throws UserException { @@ -119,18 +106,16 @@ public void addReplicationJob(ReplicationJob job) throws AlreadyExistsException job.getDatabaseId(), job.getTableId(), job.getReplicationDataSize(), replicatingDataSizeMB); } - public boolean hasRunningJobs() { - return !runningJobs.isEmpty(); + public Collection getRunningJobs() { + return runningJobs.values(); } - public boolean hasFailedJobs() { - return !abortedJobs.isEmpty(); + public Collection getCommittedJobs() { + return committedJobs.values(); } - public void clearFinishedJobs() { - committedJobs.clear(); - abortedJobs.clear(); - GlobalStateMgr.getServingState().getEditLog().logReplicationJob(null); + public Collection getAbortedJobs() { + return abortedJobs.values(); } public void cancelRunningJobs() { @@ -170,12 +155,6 @@ public void finishReplicateSnapshotTask(ReplicateSnapshotTask task, TFinishTaskR } public void replayReplicationJob(ReplicationJob replicationJob) { - if (replicationJob == null) { - committedJobs.clear(); - abortedJobs.clear(); - return; - } - if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) { committedJobs.put(replicationJob.getTableId(), replicationJob); runningJobs.remove(replicationJob.getTableId()); @@ -187,6 +166,16 @@ public void replayReplicationJob(ReplicationJob replicationJob) { } } + public void replayDeleteReplicationJob(ReplicationJob replicationJob) { + if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) { + committedJobs.remove(replicationJob.getTableId()); + } else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) { + abortedJobs.remove(replicationJob.getTableId()); + } else { + runningJobs.remove(replicationJob.getTableId()); + } + } + private long getReplicatingReplicaCount() { long replicatingReplicaCount = 0; for (ReplicationJob job : runningJobs.values()) { @@ -203,6 +192,48 @@ private long getReplicatingDataSize() { return replicatingDataSize; } + private void runRunningJobs() { + List toRemovedJobs = Lists.newArrayList(); + for (ReplicationJob job : runningJobs.values()) { + job.run(); + + ReplicationJobState state = job.getState(); + if (state.equals(ReplicationJobState.COMMITTED)) { + toRemovedJobs.add(job); + committedJobs.put(job.getTableId(), job); + } else if (state.equals(ReplicationJobState.ABORTED)) { + toRemovedJobs.add(job); + abortedJobs.put(job.getTableId(), job); + } + } + + for (ReplicationJob job : toRemovedJobs) { + runningJobs.remove(job.getTableId(), job); + } + } + + private void clearExpiredJobs() { + for (Iterator> it = committedJobs.entrySet().iterator(); it.hasNext();) { + ReplicationJob job = it.next().getValue(); + if (!job.isExpired()) { + continue; + } + + GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job); + it.remove(); + } + + for (Iterator> it = abortedJobs.entrySet().iterator(); it.hasNext();) { + ReplicationJob job = it.next().getValue(); + if (!job.isExpired()) { + continue; + } + + GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job); + it.remove(); + } + } + public void save(DataOutputStream dos) throws IOException, SRMetaBlockException { SRMetaBlockWriter writer = new SRMetaBlockWriter(dos, SRMetaBlockID.REPLICATION_MGR, 1); writer.writeJson(this); diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java index 87e1585885fb6..7f24b95027fe2 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java @@ -188,6 +188,7 @@ public void testRecoverableOperations() { Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DROP_STORAGE_VOLUME)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_ALTER_CATALOG)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB)); + Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB)); } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java index 68d4233c6f41c..0228692801451 100644 --- a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java @@ -19,8 +19,10 @@ import com.starrocks.catalog.OlapTable; import com.starrocks.catalog.Partition; import com.starrocks.catalog.Tablet; +import com.starrocks.common.Config; import com.starrocks.common.io.DeepCopy; import com.starrocks.common.jmockit.Deencapsulation; +import com.starrocks.common.proc.ReplicationsProcNode; import com.starrocks.leader.LeaderImpl; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.analyzer.AnalyzeTestUtil; @@ -158,6 +160,21 @@ public void testNormal() throws Exception { Assert.assertEquals(partition.getCommittedVersion(), srcPartition.getVisibleVersion()); replicationMgr.replayReplicationJob(job); + + Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty()); + Assert.assertFalse(replicationMgr.getCommittedJobs().isEmpty()); + + int old = Config.history_job_keep_max_second; + Config.history_job_keep_max_second = -1; + Assert.assertTrue(job.isExpired()); + + replicationMgr.runAfterCatalogReady(); + Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty()); + + replicationMgr.replayDeleteReplicationJob(job); + Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty()); + + Config.history_job_keep_max_second = old; } @Test @@ -170,11 +187,8 @@ public void testInitializingCancel() { replicationMgr.runAfterCatalogReady(); Assert.assertEquals(ReplicationJobState.ABORTED, job.getState()); - Assert.assertFalse(replicationMgr.hasRunningJobs()); - Assert.assertTrue(replicationMgr.hasFailedJobs()); - - replicationMgr.clearFinishedJobs(); - replicationMgr.replayReplicationJob(null); + Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty()); + Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty()); } @Test @@ -189,6 +203,21 @@ public void testSnapshotingCancel() { replicationMgr.runAfterCatalogReady(); Assert.assertEquals(ReplicationJobState.ABORTED, job.getState()); + + Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty()); + Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty()); + + int old = Config.history_job_keep_max_second; + Config.history_job_keep_max_second = -1; + Assert.assertTrue(job.isExpired()); + + replicationMgr.runAfterCatalogReady(); + Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty()); + + replicationMgr.replayDeleteReplicationJob(job); + Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty()); + + Config.history_job_keep_max_second = old; } @Test @@ -364,4 +393,14 @@ private static TSnapshotInfo newTSnapshotInfo(TBackend backend, String snapshotP tSnapshotInfo.setIncremental_snapshot(incrementalSnapshot); return tSnapshotInfo; } + + @Test + public void testReplicationsProcNode() { + ReplicationsProcNode procNode = new ReplicationsProcNode(); + try { + procNode.fetchResult(); + } catch (Exception e) { + Assert.assertNull(e); + } + } } \ No newline at end of file