diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java index 7549e167c5215..4c6b88aa02599 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java @@ -1148,6 +1148,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_DELETE_REPLICATION_JOB: { + data = ReplicationJobLog.read(in); + isRead = true; + break; + } case OperationType.OP_RECOVER_PARTITION_VERSION: { data = GsonUtils.GSON.fromJson(Text.readString(in), PartitionVersionRecoveryInfo.class); isRead = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java index 68a2a8bb8eedc..729372483ae2f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java @@ -1185,6 +1185,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal) globalStateMgr.getReplicationMgr().replayReplicationJob(replicationJobLog.getReplicationJob()); break; } + case OperationType.OP_DELETE_REPLICATION_JOB: { + ReplicationJobLog replicationJobLog = (ReplicationJobLog) journal.getData(); + globalStateMgr.getReplicationMgr().replayDeleteReplicationJob(replicationJobLog.getReplicationJob()); + break; + } case OperationType.OP_RECOVER_PARTITION_VERSION: { PartitionVersionRecoveryInfo info = (PartitionVersionRecoveryInfo) journal.getData(); GlobalStateMgr.getCurrentState().getMetaRecoveryDaemon().recoverPartitionVersion(info); @@ -1994,6 +1999,11 @@ public void logReplicationJob(ReplicationJob replicationJob) { logEdit(OperationType.OP_REPLICATION_JOB, replicationJobLog); } + public void logDeleteReplicationJob(ReplicationJob replicationJob) { + ReplicationJobLog replicationJobLog = new ReplicationJobLog(replicationJob); + logEdit(OperationType.OP_DELETE_REPLICATION_JOB, replicationJobLog); + } + public void logColumnRename(ColumnRenameInfo columnRenameInfo) { logJsonObject(OperationType.OP_RENAME_COLUMN_V2, columnRenameInfo); } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java index d515b5cfa4c6a..e1999a2ca21c4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java @@ -926,6 +926,8 @@ public class OperationType { // Replication job @IgnorableOnReplayFailed public static final short OP_REPLICATION_JOB = 13500; + @IgnorableOnReplayFailed + public static final short OP_DELETE_REPLICATION_JOB = 13501; @IgnorableOnReplayFailed public static final short OP_DISABLE_TABLE_RECOVERY = 13510; diff --git a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java index 5c4065437efb5..7b39854522ddc 100644 --- a/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/replication/ReplicationMgr.java @@ -118,12 +118,6 @@ public Collection getAbortedJobs() { return abortedJobs.values(); } - public void clearFinishedJobs() { - committedJobs.clear(); - abortedJobs.clear(); - GlobalStateMgr.getServingState().getEditLog().logReplicationJob(null); - } - public void cancelRunningJobs() { List toRemovedJobs = Lists.newArrayList(); for (ReplicationJob job : runningJobs.values()) { @@ -161,28 +155,24 @@ public void finishReplicateSnapshotTask(ReplicateSnapshotTask task, TFinishTaskR } public void replayReplicationJob(ReplicationJob replicationJob) { - if (replicationJob == null) { - committedJobs.clear(); - abortedJobs.clear(); - return; + if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) { + committedJobs.put(replicationJob.getTableId(), replicationJob); + runningJobs.remove(replicationJob.getTableId()); + } else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) { + abortedJobs.put(replicationJob.getTableId(), replicationJob); + runningJobs.remove(replicationJob.getTableId()); + } else { + runningJobs.put(replicationJob.getTableId(), replicationJob); } + } + public void replayDeleteReplicationJob(ReplicationJob replicationJob) { if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) { - if (replicationJob.isExpired()) { - committedJobs.remove(replicationJob.getTableId()); - } else { - committedJobs.put(replicationJob.getTableId(), replicationJob); - runningJobs.remove(replicationJob.getTableId()); - } + committedJobs.remove(replicationJob.getTableId()); } else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) { - if (replicationJob.isExpired()) { - abortedJobs.remove(replicationJob.getTableId()); - } else { - abortedJobs.put(replicationJob.getTableId(), replicationJob); - runningJobs.remove(replicationJob.getTableId()); - } + abortedJobs.remove(replicationJob.getTableId()); } else { - runningJobs.put(replicationJob.getTableId(), replicationJob); + runningJobs.remove(replicationJob.getTableId()); } } @@ -229,7 +219,7 @@ private void clearExpiredJobs() { continue; } - GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job); + GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job); it.remove(); } @@ -239,7 +229,7 @@ private void clearExpiredJobs() { continue; } - GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job); + GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job); it.remove(); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java index ebc49593ef367..6d1d5e62ae688 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java @@ -206,6 +206,7 @@ public void testRecoverableOperations() { Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DROP_DICTIONARY)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_MODIFY_DICTIONARY_MGR)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB)); + Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB)); } @Test diff --git a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java index 602821bd306b3..5b359868a5b91 100644 --- a/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/replication/ReplicationMgrTest.java @@ -178,7 +178,7 @@ public void testNormal() throws Exception { replicationMgr.runAfterCatalogReady(); Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty()); - replicationMgr.replayReplicationJob(job); + replicationMgr.replayDeleteReplicationJob(job); Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty()); Config.history_job_keep_max_second = old; @@ -196,9 +196,6 @@ public void testInitializingCancel() { Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty()); Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty()); - - replicationMgr.clearFinishedJobs(); - replicationMgr.replayReplicationJob(null); } @Test @@ -224,7 +221,7 @@ public void testSnapshotingCancel() { replicationMgr.runAfterCatalogReady(); Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty()); - replicationMgr.replayReplicationJob(job); + replicationMgr.replayDeleteReplicationJob(job); Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty()); Config.history_job_keep_max_second = old;