Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Oct 15, 2024
1 parent 60c08be commit 541daa5
Showing 1 changed file with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,24 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
try {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
} catch (Exception e) {
onCompletion.run();
throw e;
}
}

/**
Expand Down Expand Up @@ -658,37 +663,32 @@ public void deleteTranslogFiles() throws IOException {
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
);
} catch (Exception e) {
onCompletion.run();
throw e;
}
}
);
}

/**
Expand Down

0 comments on commit 541daa5

Please sign in to comment.