Skip to content

Commit d1ed964

Browse files
authored
core: Asyncify commitlog compressor (#2743)
1 parent d7d5306 commit d1ed964

File tree

1 file changed

+20
-19
lines changed

1 file changed

+20
-19
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,25 +1409,26 @@ pub async fn snapshot_watching_commitlog_compressor(
14091409
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
14101410
while snapshot_rx.changed().await.is_ok() {
14111411
let snapshot_offset = *snapshot_rx.borrow_and_update();
1412-
let Ok(segment_offsets) = durability
1413-
.existing_segment_offsets()
1414-
.inspect_err(|e| tracing::warn!("failed to find offsets: {e}"))
1415-
else {
1416-
continue;
1417-
};
1418-
let start_idx = segment_offsets
1419-
.binary_search(&prev_snapshot_offset)
1420-
// if the snapshot is in the middle of a segment, we want to round down.
1421-
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1422-
.unwrap_or_else(|i| i.saturating_sub(1));
1423-
let segment_offsets = &segment_offsets[start_idx..];
1424-
let end_idx = segment_offsets
1425-
.binary_search(&snapshot_offset)
1426-
.unwrap_or_else(|i| i.saturating_sub(1));
1427-
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1428-
// which we don't want to compress, so an exclusive range is correct.
1429-
let segment_offsets = &segment_offsets[..end_idx];
1430-
if let Err(e) = durability.compress_segments(segment_offsets) {
1412+
let durability = durability.clone();
1413+
let res = asyncify(move || {
1414+
let segment_offsets = durability.existing_segment_offsets()?;
1415+
let start_idx = segment_offsets
1416+
.binary_search(&prev_snapshot_offset)
1417+
// if the snapshot is in the middle of a segment, we want to round down.
1418+
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1419+
.unwrap_or_else(|i| i.saturating_sub(1));
1420+
let segment_offsets = &segment_offsets[start_idx..];
1421+
let end_idx = segment_offsets
1422+
.binary_search(&snapshot_offset)
1423+
.unwrap_or_else(|i| i.saturating_sub(1));
1424+
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1425+
// which we don't want to compress, so an exclusive range is correct.
1426+
let segment_offsets = &segment_offsets[..end_idx];
1427+
durability.compress_segments(segment_offsets)
1428+
})
1429+
.await;
1430+
1431+
if let Err(e) = res {
14311432
tracing::warn!("failed to compress segments: {e}");
14321433
continue;
14331434
}

0 commit comments

Comments
 (0)