diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 86cf8085a42b10..e2eef7966be0d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -121,34 +121,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { allBinlogs.add(binlog); binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog); - - if (binlog.getType() == TBinlogType.DROP_PARTITION) { - DropPartitionInfo info = DropPartitionInfo.fromJson(binlog.data); - if (info != null && info.getPartitionId() > 0) { - droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq())); - } - } else if (binlog.getType() == TBinlogType.DROP_TABLE) { - DropTableRecord record = DropTableRecord.fromJson(binlog.data); - if (record != null && record.getTableId() > 0) { - droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq())); - } - } else if (binlog.getType() == TBinlogType.ALTER_JOB) { - AlterJobRecord record = AlterJobRecord.fromJson(binlog.data); - if (record != null && record.isSchemaChangeJob() && record.isJobFinished()) { - for (Long indexId : record.getOriginIndexIdList()) { - if (indexId != null && indexId > 0) { - droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); - } - } - } - } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) { - TruncateTableRecord record = TruncateTableRecord.fromJson(binlog.data); - if (record != null) { - for (long partitionId : record.getOldPartitionIds()) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); - } - } - } + recordDroppedResources(binlog); if (tableIds == null) { return; @@ -202,31 +175,7 @@ public void addBinlog(TBinlog binlog, Object raw) { return; } - if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { - long partitionId = ((DropPartitionInfo) raw).getPartitionId(); - if (partitionId > 0) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); - } - } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { - long tableId = ((DropTableRecord) raw).getTableId(); - if (tableId > 0) { - droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); - } - } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) { - AlterJobRecord alterJobRecord = (AlterJobRecord) raw; - if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) { - for (Long indexId : alterJobRecord.getOriginIndexIdList()) { - if (indexId != null && indexId > 0) { - droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); - } - } - } - } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) { - TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw; - for (long partitionId : truncateTableRecord.getOldPartitionIds()) { - droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); - } - } + recordDroppedResources(binlog, raw); switch (binlog.getType()) { case CREATE_TABLE: @@ -670,4 +619,59 @@ public void getBinlogInfo(BaseProcResult result) { lock.readLock().unlock(); } } + + private void recordDroppedResources(TBinlog binlog) { + recordDroppedResources(binlog, null); + } + + // A method to record the dropped tables, indexes, and partitions. + private void recordDroppedResources(TBinlog binlog, Object raw) { + if (raw == null) { + switch (binlog.getType()) { + case DROP_PARTITION: + raw = DropPartitionInfo.fromJson(binlog.data); + break; + case DROP_TABLE: + raw = DropTableRecord.fromJson(binlog.data); + break; + case ALTER_JOB: + raw = AlterJobRecord.fromJson(binlog.data); + break; + case TRUNCATE_TABLE: + raw = TruncateTableRecord.fromJson(binlog.data); + break; + default: + break; + } + if (raw == null) { + return; + } + } + + if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) { + long partitionId = ((DropPartitionInfo) raw).getPartitionId(); + if (partitionId > 0) { + droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + } + } else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) { + long tableId = ((DropTableRecord) raw).getTableId(); + if (tableId > 0) { + droppedTables.add(Pair.of(tableId, binlog.getCommitSeq())); + } + } else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) { + AlterJobRecord alterJobRecord = (AlterJobRecord) raw; + if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) { + for (Long indexId : alterJobRecord.getOriginIndexIdList()) { + if (indexId != null && indexId > 0) { + droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq())); + } + } + } + } else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) { + TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw; + for (long partitionId : truncateTableRecord.getOldPartitionIds()) { + droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq())); + } + } + } }