Skip to content

Commit

Permalink
add delete replication job operation
Browse files Browse the repository at this point in the history
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Sep 2, 2024
1 parent 88115af commit fce4ce5
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_DELETE_REPLICATION_JOB: {
data = ReplicationJobLog.read(in);
isRead = true;
break;
}
case OperationType.OP_RECOVER_PARTITION_VERSION: {
data = GsonUtils.GSON.fromJson(Text.readString(in), PartitionVersionRecoveryInfo.class);
isRead = true;
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
globalStateMgr.getReplicationMgr().replayReplicationJob(replicationJobLog.getReplicationJob());
break;
}
case OperationType.OP_DELETE_REPLICATION_JOB: {
ReplicationJobLog replicationJobLog = (ReplicationJobLog) journal.getData();
globalStateMgr.getReplicationMgr().replayDeleteReplicationJob(replicationJobLog.getReplicationJob());
break;
}
case OperationType.OP_RECOVER_PARTITION_VERSION: {
PartitionVersionRecoveryInfo info = (PartitionVersionRecoveryInfo) journal.getData();
GlobalStateMgr.getCurrentState().getMetaRecoveryDaemon().recoverPartitionVersion(info);
Expand Down Expand Up @@ -1994,6 +1999,11 @@ public void logReplicationJob(ReplicationJob replicationJob) {
logEdit(OperationType.OP_REPLICATION_JOB, replicationJobLog);
}

public void logDeleteReplicationJob(ReplicationJob replicationJob) {
ReplicationJobLog replicationJobLog = new ReplicationJobLog(replicationJob);
logEdit(OperationType.OP_DELETE_REPLICATION_JOB, replicationJobLog);
}

public void logColumnRename(ColumnRenameInfo columnRenameInfo) {
logJsonObject(OperationType.OP_RENAME_COLUMN_V2, columnRenameInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,8 @@ public class OperationType {
// Replication job
@IgnorableOnReplayFailed
public static final short OP_REPLICATION_JOB = 13500;
@IgnorableOnReplayFailed
public static final short OP_DELETE_REPLICATION_JOB = 13501;

@IgnorableOnReplayFailed
public static final short OP_DISABLE_TABLE_RECOVERY = 13510;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ public Collection<ReplicationJob> getAbortedJobs() {
return abortedJobs.values();
}

public void clearFinishedJobs() {
committedJobs.clear();
abortedJobs.clear();
GlobalStateMgr.getServingState().getEditLog().logReplicationJob(null);
}

public void cancelRunningJobs() {
List<ReplicationJob> toRemovedJobs = Lists.newArrayList();
for (ReplicationJob job : runningJobs.values()) {
Expand Down Expand Up @@ -161,28 +155,24 @@ public void finishReplicateSnapshotTask(ReplicateSnapshotTask task, TFinishTaskR
}

public void replayReplicationJob(ReplicationJob replicationJob) {
if (replicationJob == null) {
committedJobs.clear();
abortedJobs.clear();
return;
if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) {
committedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
} else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) {
abortedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
} else {
runningJobs.put(replicationJob.getTableId(), replicationJob);
}
}

public void replayDeleteReplicationJob(ReplicationJob replicationJob) {
if (replicationJob.getState().equals(ReplicationJobState.COMMITTED)) {
if (replicationJob.isExpired()) {
committedJobs.remove(replicationJob.getTableId());
} else {
committedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
}
committedJobs.remove(replicationJob.getTableId());
} else if (replicationJob.getState().equals(ReplicationJobState.ABORTED)) {
if (replicationJob.isExpired()) {
abortedJobs.remove(replicationJob.getTableId());
} else {
abortedJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
}
abortedJobs.remove(replicationJob.getTableId());
} else {
runningJobs.put(replicationJob.getTableId(), replicationJob);
runningJobs.remove(replicationJob.getTableId());
}
}

Expand Down Expand Up @@ -229,7 +219,7 @@ private void clearExpiredJobs() {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job);
GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job);
it.remove();
}

Expand All @@ -239,7 +229,7 @@ private void clearExpiredJobs() {
continue;
}

GlobalStateMgr.getServingState().getEditLog().logReplicationJob(job);
GlobalStateMgr.getServingState().getEditLog().logDeleteReplicationJob(job);
it.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public void testRecoverableOperations() {
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DROP_DICTIONARY));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_MODIFY_DICTIONARY_MGR));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB));
Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void testNormal() throws Exception {
replicationMgr.runAfterCatalogReady();
Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty());

replicationMgr.replayReplicationJob(job);
replicationMgr.replayDeleteReplicationJob(job);
Assert.assertTrue(replicationMgr.getCommittedJobs().isEmpty());

Config.history_job_keep_max_second = old;
Expand All @@ -196,9 +196,6 @@ public void testInitializingCancel() {

Assert.assertTrue(replicationMgr.getRunningJobs().isEmpty());
Assert.assertFalse(replicationMgr.getAbortedJobs().isEmpty());

replicationMgr.clearFinishedJobs();
replicationMgr.replayReplicationJob(null);
}

@Test
Expand All @@ -224,7 +221,7 @@ public void testSnapshotingCancel() {
replicationMgr.runAfterCatalogReady();
Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty());

replicationMgr.replayReplicationJob(job);
replicationMgr.replayDeleteReplicationJob(job);
Assert.assertTrue(replicationMgr.getAbortedJobs().isEmpty());

Config.history_job_keep_max_second = old;
Expand Down

0 comments on commit fce4ce5

Please sign in to comment.