From b59169e7a2da4a4fd9114285241cff660bcc2739 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 1 Oct 2024 17:29:49 +0200 Subject: [PATCH 01/40] fix: maybe #63 --- src/compaction/leveled.rs | 10 +++++++++- src/compaction/worker.rs | 1 + 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index fe80d57d..6ba800ed 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,7 +160,15 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + // NOTE: We purposefully not trivially move segments + // if we go from L1 to L2 + // https://github.com/fjall-rs/lsm-tree/issues/63 + let goes_into_cold_storage = next_level_index == 2; + + if next_level_overlapping_segment_ids.is_empty() + && level.is_disjoint + && !goes_into_cold_storage + { return Choice::Move(choice); } return Choice::Merge(choice); diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 079608dd..2a21606e 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -80,6 +80,7 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { let choice = opts.strategy.choose(&original_levels, &opts.config); log::debug!("compactor: choice: {choice:?}"); + eprintln!("{original_levels}"); match choice { Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), From ec159995ea245ec3dd06ebfb5e6c7a15db7edd0c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 1 Oct 2024 18:24:04 +0200 Subject: [PATCH 02/40] remove log --- src/compaction/worker.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 2a21606e..079608dd 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -80,7 +80,6 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { let choice = opts.strategy.choose(&original_levels, &opts.config); log::debug!("compactor: choice: {choice:?}"); - eprintln!("{original_levels}"); match choice { Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), From a4d3d5c350711dbce50d5d027ac7053082286dff Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 1 Oct 2024 22:15:07 +0200 Subject: [PATCH 03/40] use lz4 "unsafe" mode --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d9569e0b..1476654b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ double-ended-peekable = "0.1.0" enum_dispatch = "0.3.13" guardian = "1.1.0" log = "0.4.22" -lz4_flex = { version = "0.11.3", optional = true } +lz4_flex = { version = "0.11.3", optional = true, default-features = false } miniz_oxide = { version = "0.8.0", optional = true } path-absolutize = "3.1.1" quick_cache = { version = "0.6.5", default-features = false, features = [] } From ded10827f323e08dc2f4f38b8c4044cc62cd5946 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 13 Oct 2024 20:15:58 +0200 Subject: [PATCH 04/40] merge from main --- src/compaction/leveled.rs | 99 ++++++++++++++++++++++++++++++++++++--- src/compaction/worker.rs | 2 + 2 files changed, 94 insertions(+), 7 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 4b173bec..4e07836e 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -4,7 +4,11 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ - config::Config, key_range::KeyRange, level_manifest::LevelManifest, segment::Segment, HashSet, + config::Config, + key_range::KeyRange, + level_manifest::{level::Level, LevelManifest}, + segment::Segment, + HashSet, SegmentId, }; use std::sync::Arc; @@ -64,6 +68,82 @@ fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> us (ratio as usize).pow(u32::from(level_idx)) * (target_size as usize) } +fn pick_minimal_overlap( + curr_level: &Level, + next_level: &Level, + overshoot: u64, +) -> (HashSet, bool) { + let mut choices = vec![]; + + for size in 1..=curr_level.len() { + let windows = curr_level.windows(size); + + for window in windows { + let size_sum = window.iter().map(|x| x.metadata.file_size).sum::(); + + if size_sum >= overshoot { + // NOTE: Consider this window + + let mut segment_ids: HashSet = + window.iter().map(|x| x.metadata.id).collect(); + + // Get overlapping segments in next level + let key_range = aggregate_key_range(window); + + let next_level_overlapping_segments: Vec<_> = next_level + .overlapping_segments(&key_range) + .cloned() + .collect(); + + // Get overlapping segments in same level + let key_range = aggregate_key_range(&next_level_overlapping_segments); + + let curr_level_overlapping_segment_ids: Vec<_> = curr_level + .overlapping_segments(&key_range) + .filter(|x| !segment_ids.contains(&x.metadata.id)) + .collect(); + + // Calculate effort + let size_next_level = next_level_overlapping_segments + .iter() + .map(|x| x.metadata.file_size) + .sum::(); + + let size_curr_level = curr_level_overlapping_segment_ids + .iter() + .map(|x| x.metadata.file_size) + .sum::(); + + let effort = size_sum + size_next_level + size_curr_level; + + segment_ids.extend( + next_level_overlapping_segments + .iter() + .map(|x| x.metadata.id), + ); + + segment_ids.extend( + curr_level_overlapping_segment_ids + .iter() + .map(|x| x.metadata.id), + ); + + // TODO: need to calculate write_amp and choose minimum write_amp instead + choices.push(( + effort, + segment_ids, + next_level_overlapping_segments.is_empty(), + )); + } + } + } + + let minimum_effort_choice = choices.into_iter().min_by(|a, b| a.0.cmp(&b.0)); + let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist"); + + (set, can_trivial_move) +} + impl CompactionStrategy for Strategy { #[allow(clippy::too_many_lines)] fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice { @@ -99,15 +179,20 @@ impl CompactionStrategy for Strategy { continue; } - let curr_level_bytes = level.size(); - let desired_bytes = desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.target_size); - let mut overshoot = curr_level_bytes.saturating_sub(desired_bytes as u64); + let overshoot = level.size().saturating_sub(desired_bytes as u64); if overshoot > 0 { - let mut segments_to_compact = vec![]; + let Some(next_level) = &resolved_view.get(next_level_index as usize) else { + break; + }; + + let (segment_ids, can_trivial_move) = + pick_minimal_overlap(level, next_level, overshoot); + + /* let mut segments_to_compact = vec![]; let mut level = level.clone(); level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort @@ -148,7 +233,7 @@ impl CompactionStrategy for Strategy { .map(|x| x.metadata.id) .collect(); - segment_ids.extend(&next_level_overlapping_segment_ids); + segment_ids.extend(&next_level_overlapping_segment_ids); */ let choice = CompactionInput { segment_ids: { @@ -160,7 +245,7 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + if can_trivial_move && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 2b341752..d4cdf586 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -164,6 +164,8 @@ fn merge_segments( let last_level = levels.last_level_index(); levels.hide_segments(&payload.segment_ids); + + println!("--- comp: {levels}"); drop(levels); // NOTE: Only evict tombstones when reaching the last level, From 95f576b8018d4fb579a975dc4a971a9bd796bdda Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 13 Oct 2024 20:39:45 +0200 Subject: [PATCH 05/40] fix --- src/compaction/worker.rs | 1 - src/key_range.rs | 10 +++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index d4cdf586..68f390eb 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -165,7 +165,6 @@ fn merge_segments( levels.hide_segments(&payload.segment_ids); - println!("--- comp: {levels}"); drop(levels); // NOTE: Only evict tombstones when reaching the last level, diff --git a/src/key_range.rs b/src/key_range.rs index 7afc7549..c09bfc5c 100644 --- a/src/key_range.rs +++ b/src/key_range.rs @@ -117,14 +117,10 @@ impl KeyRange { } /// Aggregates a key range. - /// - /// # Panics - /// - /// The iterator must not be empty pub fn aggregate<'a>(mut iter: impl Iterator) -> Self { - // NOTE: See function documentation - #[allow(clippy::expect_used)] - let first = iter.next().expect("should not be empty"); + let Some(first) = iter.next() else { + return Self::empty(); + }; let mut min = first.min(); let mut max = first.max(); From f8d7a19c024bd399224e1bf78dd9f9e93a6627f2 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 13 Oct 2024 22:35:13 +0200 Subject: [PATCH 06/40] increase default level ratio --- src/compaction/leveled.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 4e07836e..cfe65479 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -55,7 +55,7 @@ impl Default for Strategy { Self { l0_threshold: 4, target_size: 64 * 1_024 * 1_024, - level_ratio: 8, + level_ratio: 10, } } } From 7d8913ec8d520d18c7c9205781c3c74f8aa50472 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 13 Oct 2024 22:36:02 +0200 Subject: [PATCH 07/40] comment --- src/compaction/leveled.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index cfe65479..b8f85eb7 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -55,7 +55,7 @@ impl Default for Strategy { Self { l0_threshold: 4, target_size: 64 * 1_024 * 1_024, - level_ratio: 10, + level_ratio: 8, // TODO: benchmark vs 10 } } } From 8f9eb2b243531919d32ff464ff4900ed3075853c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 20 Oct 2024 21:02:04 +0200 Subject: [PATCH 08/40] fix: leveled compaction tests --- src/compaction/leveled.rs | 112 +++++++++++++++----------------------- src/segment/block/mod.rs | 9 +++ 2 files changed, 53 insertions(+), 68 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index b8f85eb7..534052ed 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -129,6 +129,17 @@ fn pick_minimal_overlap( ); // TODO: need to calculate write_amp and choose minimum write_amp instead + // + // consider the segments in La = A to be the ones in the window + // and the segments in La+1 B to be the ones that overlap + // and r = A / B + // we want to avoid compactions that have a low ratio r + // because that means we don't clear out a lot of segments in La + // but have to rewrite a lot of segments in La+1 + // + // ultimately, we want the highest ratio + // to maximize the amount of segments we are getting rid of in La + // for the least amount of effort choices.push(( effort, segment_ids, @@ -192,49 +203,6 @@ impl CompactionStrategy for Strategy { let (segment_ids, can_trivial_move) = pick_minimal_overlap(level, next_level, overshoot); - /* let mut segments_to_compact = vec![]; - - let mut level = level.clone(); - level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort - - for segment in level.iter().take(self.level_ratio.into()).cloned() { - if overshoot == 0 { - break; - } - - overshoot = overshoot.saturating_sub(segment.metadata.file_size); - segments_to_compact.push(segment); - } - - debug_assert!(!segments_to_compact.is_empty()); - - let Some(next_level) = &resolved_view.get(next_level_index as usize) else { - break; - }; - - let mut segment_ids: HashSet = - segments_to_compact.iter().map(|x| x.metadata.id).collect(); - - // Get overlapping segments in same level - let key_range = aggregate_key_range(&segments_to_compact); - - let curr_level_overlapping_segment_ids: Vec<_> = level - .overlapping_segments(&key_range) - .map(|x| x.metadata.id) - .collect(); - - segment_ids.extend(&curr_level_overlapping_segment_ids); - - // Get overlapping segments in next level - let key_range = aggregate_key_range(&segments_to_compact); - - let next_level_overlapping_segment_ids: Vec<_> = next_level - .overlapping_segments(&key_range) - .map(|x| x.metadata.id) - .collect(); - - segment_ids.extend(&next_level_overlapping_segment_ids); */ - let choice = CompactionInput { segment_ids: { let mut v = segment_ids.into_iter().collect::>(); @@ -383,7 +351,7 @@ mod tests { #[allow(clippy::expect_used)] fn build_levels( path: &Path, - recipe: Vec>, + recipe: Vec>, ) -> crate::Result { let mut levels = LevelManifest::create_new( recipe.len().try_into().expect("oopsie"), @@ -391,10 +359,15 @@ mod tests { )?; for (idx, level) in recipe.into_iter().enumerate() { - for (id, min, max) in level { + for (id, min, max, size_mib) in level { levels.insert_into_level( idx.try_into().expect("oopsie"), - fixture_segment(id, string_key_range(min, max), 64 * 1_024 * 1_024, 0.0), + fixture_segment( + id, + string_key_range(min, max), + size_mib * 1_024 * 1_024, + 0.0, + ), ); } } @@ -433,7 +406,7 @@ mod tests { #[rustfmt::skip] let mut levels = build_levels(tempdir.path(), vec![ - vec![(1, "a", "z"), (2, "a", "z"), (3, "a", "z"), (4, "a", "z")], + vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)], vec![], vec![], vec![], @@ -468,8 +441,8 @@ mod tests { #[rustfmt::skip] let levels = build_levels(tempdir.path(), vec![ - vec![(1, "h", "t"), (2, "h", "t"), (3, "h", "t"), (4, "h", "t")], - vec![(5, "a", "g"), (6, "a", "g"), (7, "a", "g"), (8, "a", "g")], + vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)], + vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)], vec![], vec![], ])?; @@ -496,8 +469,8 @@ mod tests { #[rustfmt::skip] let mut levels = build_levels(tempdir.path(), vec![ - vec![(1, "a", "g"), (2, "h", "t"), (3, "i", "t"), (4, "j", "t")], - vec![(5, "a", "g"), (6, "a", "g"), (7, "y", "z"), (8, "y", "z")], + vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)], + vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)], vec![], vec![], ])?; @@ -533,16 +506,17 @@ mod tests { #[rustfmt::skip] let levels = build_levels(tempdir.path(), vec![ vec![], - vec![(1, "a", "g"), (2, "h", "t"), (3, "x", "z")], - vec![(4, "f", "l")], + vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "x", "z", 64)], + vec![(4, "f", "l", 64)], vec![], ])?; assert_eq!( compactor.choose(&levels, &config), - Choice::Merge(CompactionInput { + Choice::Move(CompactionInput { dest_level: 2, - segment_ids: vec![1, 4], + // NOTE: segment #3 has no overlap with L2 + segment_ids: vec![3], target_size: 64 * 1_024 * 1_024 }) ); @@ -563,8 +537,8 @@ mod tests { #[rustfmt::skip] let levels = build_levels(tempdir.path(), vec![ vec![], - vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "t")], - vec![(4, "k", "l")], + vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "t", 64)], + vec![(4, "k", "l", 64)], vec![], ])?; @@ -594,15 +568,16 @@ mod tests { let levels = build_levels(tempdir.path(), vec![ vec![], vec![], - vec![(1, "a", "g"), (2, "a", "g"), (3, "a", "g"), (4, "a", "g"), (5, "y", "z")], - vec![(6, "f", "l")], + vec![(1, "a", "g", 64), (2, "a", "g", 64), (3, "a", "g", 64), (4, "a", "g", 64), (5, "y", "z", 64)], + vec![(6, "f", "l", 64)], ])?; assert_eq!( compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 3, - segment_ids: vec![1, 2, 3, 4, 6], + // NOTE: 5 is the only segment that has no overlap with #3 + segment_ids: vec![5], target_size: 64 * 1_024 * 1_024 }) ); @@ -624,15 +599,16 @@ mod tests { let levels = build_levels(tempdir.path(), vec![ vec![], vec![], - vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")], - vec![(6, "f", "l")], + vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)], + vec![(6, "f", "l", 64)], ])?; assert_eq!( compactor.choose(&levels, &config), - Choice::Merge(CompactionInput { + Choice::Move(CompactionInput { dest_level: 3, - segment_ids: vec![1, 6], + // NOTE: segment #4 is the left-most segment that has no overlap with L3 + segment_ids: vec![4], target_size: 64 * 1_024 * 1_024 }) ); @@ -654,8 +630,8 @@ mod tests { let levels = build_levels(tempdir.path(), vec![ vec![], vec![], - vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")], - vec![(6, "w", "x")], + vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)], + vec![(6, "w", "x", 64)], ])?; assert_eq!( @@ -683,8 +659,8 @@ mod tests { #[rustfmt::skip] let levels = build_levels(tempdir.path(), vec![ vec![], - vec![(1, "a", "z"), (2, "a", "z"), (3, "g", "z")], - vec![(4, "a", "g")], + vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64)], + vec![(4, "a", "g", 64)], vec![], ])?; diff --git a/src/segment/block/mod.rs b/src/segment/block/mod.rs index eb5fa388..ddef8a38 100644 --- a/src/segment/block/mod.rs +++ b/src/segment/block/mod.rs @@ -44,9 +44,13 @@ impl Block { let header = BlockHeader::decode_from(reader)?; log::trace!("Got block header: {header:?}"); + // Read the (possibly compressed) data let mut bytes = vec![0u8; header.data_length as usize]; reader.read_exact(&mut bytes)?; + // TODO: 3.0.0 when header.compressed is reliable + // can we preallocate a vector to stream the compression into? + // -> saves reallocation costs let bytes = match header.compression { super::meta::CompressionType::None => bytes, @@ -103,6 +107,9 @@ impl Block { #[allow(clippy::cast_possible_truncation)] data_length: packed.len() as u32, + // TODO: 3.0.0 pack_items should return the uncompressed, serialized + // size directly + // NOTE: Truncation is OK because a block cannot possible contain 4 billion items #[allow(clippy::cast_possible_truncation)] uncompressed_length: items.size() as u32, @@ -123,6 +130,8 @@ impl Block { value.encode_into(&mut buf)?; } + // TODO: 3.0.0 return buf.len() - 4 as uncompressed size + Ok(match compression { CompressionType::None => buf, From e777808cbc800ff96fff5de270fe7b46d2e0da17 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Mon, 21 Oct 2024 21:42:03 +0200 Subject: [PATCH 09/40] perf: point read degradation in disjoint trees --- src/level_manifest/level.rs | 6 +++++- src/segment/mod.rs | 6 +++++- src/tree/mod.rs | 3 --- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 2b14cdfd..1cd41264 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -153,7 +153,11 @@ impl<'a> DisjointLevel<'a> { .segments .partition_point(|x| &*x.metadata.key_range.1 < key.as_ref()); - level.segments.get(idx).cloned() + level + .segments + .get(idx) + .filter(|x| x.is_key_in_key_range(key)) + .cloned() } pub fn range_indexes( diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 1bdbe4b4..d1534cda 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -344,6 +344,10 @@ impl Segment { Ok(Some(entry)) } + pub fn is_key_in_key_range>(&self, key: K) -> bool { + self.metadata.key_range.contains_key(key) + } + // NOTE: Clippy false positive #[allow(unused)] /// Retrieves an item from the segment. @@ -362,7 +366,7 @@ impl Segment { } } - if !self.metadata.key_range.contains_key(&key) { + if !self.is_key_in_key_range(&key) { return Ok(None); } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 05dd4390..e8f35f1b 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -654,9 +654,6 @@ impl Tree { } return Ok(Some(item)); } - } else { - // NOTE: Don't go to fallback, go to next level instead - continue; } } } From 0a7ee5215a06d0f306bcd1063488a6fc5528d1b2 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Mon, 21 Oct 2024 23:00:42 +0200 Subject: [PATCH 10/40] fix: bench --- benches/level_manifest.rs | 6 +++++- src/tree/mod.rs | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/benches/level_manifest.rs b/benches/level_manifest.rs index b26e4d23..91277a29 100644 --- a/benches/level_manifest.rs +++ b/benches/level_manifest.rs @@ -5,6 +5,8 @@ fn iterate_segments(c: &mut Criterion) { let mut group = c.benchmark_group("Iterate level manifest"); group.sample_size(10); + std::fs::create_dir_all(".bench").unwrap(); + for segment_count in [0, 1, 5, 10, 100, 500, 1_000, 2_000, 4_000] { group.bench_function(format!("iterate {segment_count} segments"), |b| { let folder = tempfile::tempdir_in(".bench").unwrap(); @@ -28,7 +30,9 @@ fn find_segment(c: &mut Criterion) { let mut group = c.benchmark_group("Find segment in disjoint level"); group.sample_size(10); - for segment_count in [1u64, 5, 10, 100, 500, 1_000, 2_000, 4_000] { + std::fs::create_dir_all(".bench").unwrap(); + + for segment_count in [1u64, 4, 5, 10, 100, 500, 1_000, 2_000, 4_000] { group.bench_function( format!("find segment in {segment_count} segments - binary search"), |b| { diff --git a/src/tree/mod.rs b/src/tree/mod.rs index e8f35f1b..928abb3f 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -630,8 +630,8 @@ impl Tree { let level_manifest = self.levels.read().expect("lock is poisoned"); for level in &level_manifest.levels { - // NOTE: Based on benchmarking, binary search is only worth it after ~4 segments - if level.len() >= 5 { + // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments + if level.len() >= 4 { if let Some(level) = level.as_disjoint() { // TODO: unit test in disjoint level: // [a:5, a:4] [a:3, b:5] From f9d3c60e9582070329b3226bcdf9901625325adc Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 14:54:49 +0200 Subject: [PATCH 11/40] fix(point read): go to next level if disjoint level found no result --- src/tree/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 928abb3f..f696f56e 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -655,6 +655,9 @@ impl Tree { return Ok(Some(item)); } } + + // NOTE: Go to next level + continue; } } From 102ab8ce8fcb4959e7f4c1e182fe0369ebe5ebd9 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 17:26:41 +0200 Subject: [PATCH 12/40] wip --- src/compaction/leveled.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 297c1997..5f8190e4 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -185,7 +185,7 @@ impl CompactionStrategy for Strategy { && !busy_levels.contains(&1) { let mut level = first_level.clone(); - level.sort_by_key_range(); // TODO: disjoint levels shouldn't need sort + level.sort_by_key_range(); let Some(next_level) = &resolved_view.get(1) else { return Choice::DoNothing; From 61d75e8a63b96fd64d21edf751e16b4396ec389a Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 18:22:31 +0200 Subject: [PATCH 13/40] wip --- src/compaction/leveled.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 5f8190e4..95715588 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,15 +160,17 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; + // TODO: 3.0.0 configuration? // NOTE: We purposefully not trivially move segments // if we go from L1 to L2 // https://github.com/fjall-rs/lsm-tree/issues/63 let goes_into_cold_storage = next_level_index == 2; - if next_level_overlapping_segment_ids.is_empty() - && level.is_disjoint - && !goes_into_cold_storage - { + if goes_into_cold_storage { + return Choice::Merge(choice); + } + + if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); From eaf61aab915b3595788ebc0ed21d685414237384 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 22 Oct 2024 18:57:51 +0200 Subject: [PATCH 14/40] fix test --- src/compaction/leveled.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 95715588..ebb3295d 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -495,7 +495,9 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + // NOTE: We merge because segments are demoted into "cold" levels + // see https://github.com/fjall-rs/lsm-tree/issues/63 + Choice::Merge(CompactionInput { dest_level: 2, segment_ids: vec![1], target_size: 64 * 1_024 * 1_024 From b1ee9c5ff2a7abebe2d8e0912a1ca0524a679d16 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Wed, 23 Oct 2024 23:16:29 +0200 Subject: [PATCH 15/40] Update leveled.rs --- src/compaction/leveled.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index ebb3295d..68bda7b4 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -160,6 +160,10 @@ impl CompactionStrategy for Strategy { target_size: u64::from(self.target_size), }; + // TODO: eventually, this should happen lazily + // if a segment file lives for very long, it should get rewritten + // Rocks, by default, rewrites files that are 1 month or older + // // TODO: 3.0.0 configuration? // NOTE: We purposefully not trivially move segments // if we go from L1 to L2 From f66709d32a04933fe4f52db7e1cee7ba35207e15 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 14:55:24 +0200 Subject: [PATCH 16/40] add unit test --- src/level_manifest/level.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 1cd41264..3d5c4aa1 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -399,6 +399,38 @@ mod tests { Ok(()) } + #[test] + #[allow(clippy::unwrap_used)] + fn level_disjoint_containing_key() -> crate::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = crate::Config::new(&folder).open()?; + + for k in 'c'..'k' { + tree.insert([k as u8], "", 0); + tree.flush_active_memtable(0).expect("should flush"); + } + + let first = tree + .levels + .read() + .expect("lock is poisoned") + .levels + .first() + .expect("should exist") + .clone(); + + let dis = first.as_disjoint().unwrap(); + assert!(dis.get_segment_containing_key("a").is_none()); + assert!(dis.get_segment_containing_key("b").is_none()); + for k in 'c'..'k' { + assert!(dis.get_segment_containing_key([k as u8]).is_some()); + } + assert!(dis.get_segment_containing_key("l").is_none()); + + Ok(()) + } + #[test] fn level_not_disjoint() -> crate::Result<()> { let folder = tempfile::tempdir()?; From 82aa6e50f4084c08a542b32faf3a941cbda7cbc4 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 19:03:47 +0200 Subject: [PATCH 17/40] perf: try to keep segment size close to target in disjoint workload --- src/compaction/leveled.rs | 42 +++++++++++++++++++++++++++++++-------- src/level_manifest/mod.rs | 4 +++- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index ebb3295d..34ebaa61 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -182,12 +182,37 @@ impl CompactionStrategy for Strategy { return Choice::DoNothing; }; - if first_level.len() >= self.l0_threshold.into() - && !busy_levels.contains(&0) - && !busy_levels.contains(&1) - { - let mut level = first_level.clone(); - level.sort_by_key_range(); + if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) { + if levels.is_disjoint() { + // NOTE: Special handling for disjoint workloads + + if first_level.size() < self.target_size.into() { + // NOTE: Force a merge into L0 itself + // ...we seem to have *very* small flushes + return if first_level.len() >= 32 { + Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level + .segments + .iter() + .map(|x| x.metadata.id) + .collect(), + // NOTE: Allow a bit of overshooting + target_size: ((self.target_size as f32) * 1.1) as u64, + }) + } else { + Choice::DoNothing + }; + } + + return Choice::Merge(CompactionInput { + dest_level: 1, + segment_ids: first_level.segments.iter().map(|x| x.metadata.id).collect(), + target_size: ((self.target_size as f32) * 1.1) as u64, + }); + } else if !busy_levels.contains(&1) { + let mut level = first_level.clone(); + level.sort_by_key_range(); let Some(next_level) = &resolved_view.get(1) else { return Choice::DoNothing; @@ -213,9 +238,10 @@ impl CompactionStrategy for Strategy { }; if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { - return Choice::Move(choice); + return Choice::Move(choice); + } + return Choice::Merge(choice); } - return Choice::Merge(choice); } } diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 75e0648d..db960be5 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -313,7 +313,9 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { - self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) + self.is_disjoint + // vv TODO: not needed? + && self.levels.iter().all(|x| x.is_disjoint) } /// Returns `true` if there are no segments From 184f6eef0d8b22bf0428fffee8de9893d780516c Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 19:05:50 +0200 Subject: [PATCH 18/40] fmt --- src/level_manifest/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index db960be5..d731ba55 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -313,9 +313,8 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { - self.is_disjoint - // vv TODO: not needed? - && self.levels.iter().all(|x| x.is_disjoint) + self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) + // TODO: not needed? -----------^ } /// Returns `true` if there are no segments From 985f5fd8c221051063aff92ea4abb2ed7b42f4af Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 20:33:09 +0200 Subject: [PATCH 19/40] fmt --- src/compaction/leveled.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 04362c16..d3a17cec 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -218,30 +218,30 @@ impl CompactionStrategy for Strategy { let mut level = first_level.clone(); level.sort_by_key_range(); - let Some(next_level) = &resolved_view.get(1) else { - return Choice::DoNothing; - }; + let Some(next_level) = &resolved_view.get(1) else { + return Choice::DoNothing; + }; - let mut segment_ids: Vec = - level.iter().map(|x| x.metadata.id).collect::>(); + let mut segment_ids: Vec = + level.iter().map(|x| x.metadata.id).collect::>(); - // Get overlapping segments in next level - let key_range = aggregate_key_range(&level); + // Get overlapping segments in next level + let key_range = aggregate_key_range(&level); - let next_level_overlapping_segment_ids: Vec<_> = next_level - .overlapping_segments(&key_range) - .map(|x| x.metadata.id) - .collect(); + let next_level_overlapping_segment_ids: Vec<_> = next_level + .overlapping_segments(&key_range) + .map(|x| x.metadata.id) + .collect(); - segment_ids.extend(&next_level_overlapping_segment_ids); + segment_ids.extend(&next_level_overlapping_segment_ids); - let choice = CompactionInput { - segment_ids, - dest_level: 1, - target_size: u64::from(self.target_size), - }; + let choice = CompactionInput { + segment_ids, + dest_level: 1, + target_size: u64::from(self.target_size), + }; - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); From a742daf0566f35437134b3120c70003210c46ae3 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Fri, 25 Oct 2024 20:49:07 +0200 Subject: [PATCH 20/40] add comment --- src/compaction/leveled.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index d3a17cec..f29823f2 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -191,6 +191,9 @@ impl CompactionStrategy for Strategy { // NOTE: Special handling for disjoint workloads if first_level.size() < self.target_size.into() { + // TODO: also do this in non-disjoint workloads + // -> intra-L0 compaction + // NOTE: Force a merge into L0 itself // ...we seem to have *very* small flushes return if first_level.len() >= 32 { From 07290e7ac5c8fafc7a2416ec8a7b77ff8322603b Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Sat, 26 Oct 2024 20:01:17 +0200 Subject: [PATCH 21/40] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index edcdf6fe..5e6f746b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "lsm-tree" description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)" license = "MIT OR Apache-2.0" -version = "2.1.1" +version = "2.3.0" edition = "2021" rust-version = "1.74.0" readme = "README.md" From 800df8da12562492a4d0f194e7e4894a3c04ac00 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Sat, 26 Oct 2024 20:01:22 +0200 Subject: [PATCH 22/40] Update mod.rs --- src/level_manifest/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index d731ba55..75e0648d 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -314,7 +314,6 @@ impl LevelManifest { #[must_use] pub fn is_disjoint(&self) -> bool { self.is_disjoint && self.levels.iter().all(|x| x.is_disjoint) - // TODO: not needed? -----------^ } /// Returns `true` if there are no segments From d65f523b2258fc1c6d40772fe383fa7649d5d97b Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 21:58:33 +0200 Subject: [PATCH 23/40] intra-L0 compaction --- src/compaction/leveled.rs | 32 ++++++++++++++++++++++---------- src/level_manifest/level.rs | 4 ++++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index f29823f2..8b6553c7 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -187,10 +187,11 @@ impl CompactionStrategy for Strategy { }; if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) { - if levels.is_disjoint() { - // NOTE: Special handling for disjoint workloads + let first_level_size = first_level.size(); - if first_level.size() < self.target_size.into() { + // NOTE: Special handling for disjoint workloads + if levels.is_disjoint() { + if first_level_size < self.target_size.into() { // TODO: also do this in non-disjoint workloads // -> intra-L0 compaction @@ -199,11 +200,7 @@ impl CompactionStrategy for Strategy { return if first_level.len() >= 32 { Choice::Merge(CompactionInput { dest_level: 0, - segment_ids: first_level - .segments - .iter() - .map(|x| x.metadata.id) - .collect(), + segment_ids: first_level.list_ids(), // NOTE: Allow a bit of overshooting target_size: ((self.target_size as f32) * 1.1) as u64, }) @@ -214,10 +211,25 @@ impl CompactionStrategy for Strategy { return Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: first_level.segments.iter().map(|x| x.metadata.id).collect(), + segment_ids: first_level.list_ids(), target_size: ((self.target_size as f32) * 1.1) as u64, }); - } else if !busy_levels.contains(&1) { + } + + /* let l0_threshold_size = (self.l0_threshold as u32) * self.target_size; + + if (first_level_size as f32) < (l0_threshold_size as f32) * 0.66 { + // NOTE: We reached the threshold, but L0 is still very small + // meaning we have very small segments, so do intra-L0 compaction + return Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: first_level.list_ids(), + // NOTE: Allow a bit of overshooting + target_size: self.target_size.into(), + }); + } */ + + if !busy_levels.contains(&1) { let mut level = first_level.clone(); level.sort_by_key_range(); diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 2b14cdfd..4ddc7d21 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -47,6 +47,10 @@ impl Default for Level { } impl Level { + pub fn list_ids(&self) -> Vec { + self.segments.iter().map(|x| x.metadata.id).collect() + } + pub fn insert(&mut self, segment: Arc) { self.segments.push(segment); self.set_disjoint_flag(); From b4dbd2aed3bf33781d8d1dadc79f618a6cf7867d Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:01:19 +0200 Subject: [PATCH 24/40] wip --- src/compaction/leveled.rs | 8 +++----- src/level_manifest/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index e9ce95ed..240e7a56 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -227,7 +227,7 @@ impl CompactionStrategy for Strategy { return Choice::Merge(choice); } - if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint { + if can_trivial_move && level.is_disjoint { return Choice::Move(choice); } return Choice::Merge(choice); @@ -269,9 +269,7 @@ impl CompactionStrategy for Strategy { }); } - /* let l0_threshold_size = (self.l0_threshold as u32) * self.target_size; - - if (first_level_size as f32) < (l0_threshold_size as f32) * 0.66 { + if first_level_size < self.target_size.into() { // NOTE: We reached the threshold, but L0 is still very small // meaning we have very small segments, so do intra-L0 compaction return Choice::Merge(CompactionInput { @@ -280,7 +278,7 @@ impl CompactionStrategy for Strategy { // NOTE: Allow a bit of overshooting target_size: self.target_size.into(), }); - } */ + } if !busy_levels.contains(&1) { let mut level = first_level.clone(); diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 75e0648d..0e6af530 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -50,7 +50,7 @@ impl std::fmt::Display for LevelManifest { if level.segments.is_empty() { write!(f, "")?; - } else if level.segments.len() >= 24 { + } else if level.segments.len() >= 10 { #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().take(2) { let id = segment.metadata.id; @@ -63,7 +63,7 @@ impl std::fmt::Display for LevelManifest { if is_hidden { ")" } else { "]" }, )?; } - write!(f, " . . . . . ")?; + write!(f, " . . . ")?; #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().rev().take(2).rev() { From 226c94adab51a7ac50db7a781ab53b6d84d2eec8 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:21:39 +0200 Subject: [PATCH 25/40] fix test case --- src/compaction/leveled.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 240e7a56..f9f27201 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -566,9 +566,8 @@ mod tests { assert_eq!( compactor.choose(&levels, &config), - Choice::Move(CompactionInput { + Choice::Merge(CompactionInput { dest_level: 2, - // NOTE: segment #3 has no overlap with L2 segment_ids: vec![3], target_size: 64 * 1_024 * 1_024 }) From 3e80d2ca90abfba29d49f092ecc5958d72e87dd4 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 26 Oct 2024 23:32:34 +0200 Subject: [PATCH 26/40] wip --- src/compaction/leveled.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index f9f27201..324c1bd4 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -275,7 +275,6 @@ impl CompactionStrategy for Strategy { return Choice::Merge(CompactionInput { dest_level: 0, segment_ids: first_level.list_ids(), - // NOTE: Allow a bit of overshooting target_size: self.target_size.into(), }); } From 2f61affd55637312dc5bad39d3e0002eaed2708b Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 00:38:33 +0200 Subject: [PATCH 27/40] refactor --- src/compaction/fifo.rs | 7 ++++--- src/compaction/leveled.rs | 34 +++++++++++++++------------------- src/compaction/maintenance.rs | 5 +++-- src/compaction/mod.rs | 6 +++--- src/compaction/pulldown.rs | 8 ++------ src/compaction/tiered.rs | 16 ++++++++-------- src/compaction/worker.rs | 11 ++++------- src/level_manifest/level.rs | 4 ++-- src/level_manifest/mod.rs | 8 ++++---- src/lib.rs | 7 +++++++ 10 files changed, 52 insertions(+), 54 deletions(-) diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 7e10e72c..f7131d5c 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -109,7 +109,7 @@ impl CompactionStrategy for Strategy { let mut ids = segment_ids_to_delete.into_iter().collect::>(); ids.sort_unstable(); - Choice::Drop(ids) + Choice::Drop(ids.into_iter().collect::>()) } } } @@ -133,6 +133,7 @@ mod tests { Segment, }, time::unix_timestamp, + HashSet, }; use std::sync::Arc; use test_log::test; @@ -197,7 +198,7 @@ mod tests { assert_eq!( compactor.choose(&levels, &Config::default()), - Choice::Drop(vec![1]) + Choice::Drop(set![1]) ); Ok(()) @@ -265,7 +266,7 @@ mod tests { assert_eq!( compactor.choose(&levels, &Config::default()), - Choice::Drop(vec![1, 2]) + Choice::Drop([1, 2].into_iter().collect::>()) ); Ok(()) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 324c1bd4..d91ce345 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -204,11 +204,7 @@ impl CompactionStrategy for Strategy { pick_minimal_overlap(level, next_level, overshoot); let choice = CompactionInput { - segment_ids: { - let mut v = segment_ids.into_iter().collect::>(); - v.sort_unstable(); - v - }, + segment_ids, dest_level: next_level_index, target_size: u64::from(self.target_size), }; @@ -287,8 +283,8 @@ impl CompactionStrategy for Strategy { return Choice::DoNothing; }; - let mut segment_ids: Vec = - level.iter().map(|x| x.metadata.id).collect::>(); + let mut segment_ids: HashSet = + level.iter().map(|x| x.metadata.id).collect(); // Get overlapping segments in next level let key_range = aggregate_key_range(&level); @@ -335,7 +331,7 @@ mod tests { Segment, }, time::unix_timestamp, - Config, + Config, HashSet, }; use std::{path::Path, sync::Arc}; use test_log::test; @@ -468,12 +464,12 @@ mod tests { compactor.choose(&levels, &Config::default()), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2, 3, 4], + segment_ids: [1, 2, 3, 4].into_iter().collect::>(), target_size: 64 * 1_024 * 1_024 }) ); - levels.hide_segments(&[4]); + levels.hide_segments(std::iter::once(4)); assert_eq!( compactor.choose(&levels, &Config::default()), @@ -503,7 +499,7 @@ mod tests { compactor.choose(&levels, &Config::default()), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2, 3, 4], + segment_ids: [1, 2, 3, 4].into_iter().collect::>(), target_size: 64 * 1_024 * 1_024 }) ); @@ -531,12 +527,12 @@ mod tests { compactor.choose(&levels, &Config::default()), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2, 3, 4, 5, 6], + segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::>(), target_size: 64 * 1_024 * 1_024 }) ); - levels.hide_segments(&[5]); + levels.hide_segments(std::iter::once(5)); assert_eq!( compactor.choose(&levels, &Config::default()), Choice::DoNothing @@ -567,7 +563,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 2, - segment_ids: vec![3], + segment_ids: set![3], target_size: 64 * 1_024 * 1_024 }) ); @@ -599,7 +595,7 @@ mod tests { // see https://github.com/fjall-rs/lsm-tree/issues/63 Choice::Merge(CompactionInput { dest_level: 2, - segment_ids: vec![1], + segment_ids: set![1], target_size: 64 * 1_024 * 1_024 }) ); @@ -630,7 +626,7 @@ mod tests { Choice::Merge(CompactionInput { dest_level: 3, // NOTE: 5 is the only segment that has no overlap with #3 - segment_ids: vec![5], + segment_ids: set![5], target_size: 64 * 1_024 * 1_024 }) ); @@ -661,7 +657,7 @@ mod tests { Choice::Move(CompactionInput { dest_level: 3, // NOTE: segment #4 is the left-most segment that has no overlap with L3 - segment_ids: vec![4], + segment_ids: set![4], target_size: 64 * 1_024 * 1_024 }) ); @@ -691,7 +687,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Move(CompactionInput { dest_level: 3, - segment_ids: vec![1], + segment_ids: set![1], target_size: 64 * 1_024 * 1_024 }) ); @@ -721,7 +717,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 2, - segment_ids: vec![1, 2, 3, 4], + segment_ids: [1, 2, 3, 4].into_iter().collect::>(), target_size: 64 * 1_024 * 1_024 }) ); diff --git a/src/compaction/maintenance.rs b/src/compaction/maintenance.rs index 8ec83509..4a8e3703 100644 --- a/src/compaction/maintenance.rs +++ b/src/compaction/maintenance.rs @@ -7,6 +7,7 @@ use crate::{ config::Config, level_manifest::LevelManifest, segment::{meta::SegmentId, Segment}, + HashSet, }; use std::sync::Arc; @@ -24,7 +25,7 @@ pub struct Strategy; /// /// This minimizes the compaction time (+ write amp) for a set of segments we /// want to partially compact. -pub fn choose_least_effort_compaction(segments: &[Arc], n: usize) -> Vec { +pub fn choose_least_effort_compaction(segments: &[Arc], n: usize) -> HashSet { let num_segments = segments.len(); // Ensure that n is not greater than the number of segments @@ -187,7 +188,7 @@ mod tests { compactor.choose(&levels, &Config::default()), Choice::Merge(crate::compaction::Input { dest_level: 0, - segment_ids: vec![0, 1, 2], + segment_ids: [0, 1, 2].into_iter().collect::>(), target_size: u64::MAX }) ); diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 4f825ae1..9222fdd9 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -17,7 +17,7 @@ pub use fifo::Strategy as Fifo; pub use leveled::Strategy as Leveled; pub use tiered::Strategy as SizeTiered; -use crate::{config::Config, level_manifest::LevelManifest, segment::meta::SegmentId}; +use crate::{config::Config, level_manifest::LevelManifest, segment::meta::SegmentId, HashSet}; /// Alias for `Leveled` pub type Levelled = Leveled; @@ -32,7 +32,7 @@ pub use pulldown::Strategy as PullDown; #[derive(Debug, Eq, PartialEq)] pub struct Input { /// Segments to compact - pub segment_ids: Vec, + pub segment_ids: HashSet, /// Level to put the created segments into pub dest_level: u8, @@ -60,7 +60,7 @@ pub enum Choice { /// /// This may be used by a compaction strategy that wants to delete old data /// without having to compact it away, like [`fifo::Strategy`]. - Drop(Vec), + Drop(HashSet), } /// Trait for a compaction strategy diff --git a/src/compaction/pulldown.rs b/src/compaction/pulldown.rs index 43c23e22..826a16fe 100644 --- a/src/compaction/pulldown.rs +++ b/src/compaction/pulldown.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use super::{Choice, CompactionStrategy, Input}; -use crate::{level_manifest::LevelManifest, Config}; +use crate::{level_manifest::LevelManifest, Config, HashSet}; /// Pulls down and merges a level into the destination level. /// @@ -23,11 +23,7 @@ impl CompactionStrategy for Strategy { .get(usize::from(self.1)) .expect("next level should exist"); - let mut segment_ids = level - .segments - .iter() - .map(|x| x.metadata.id) - .collect::>(); + let mut segment_ids: HashSet<_> = level.segments.iter().map(|x| x.metadata.id).collect(); segment_ids.extend(next_level.segments.iter().map(|x| x.metadata.id)); diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 7e0605a2..4024a2a3 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -91,7 +91,7 @@ impl CompactionStrategy for Strategy { segments_to_compact.push(segment); } - let segment_ids: Vec<_> = segments_to_compact + let segment_ids = segments_to_compact .iter() .map(|x| &x.metadata.id) .copied() @@ -137,7 +137,7 @@ mod tests { value_block::BlockOffset, Segment, }, - SeqNo, + HashSet, SeqNo, }; use std::sync::Arc; use test_log::test; @@ -235,7 +235,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2, 3, 4], + segment_ids: set![1, 2, 3, 4], target_size: u64::MAX, }) ); @@ -264,7 +264,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2], + segment_ids: set![1, 2], target_size: u64::MAX, }) ); @@ -297,7 +297,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 2, - segment_ids: vec![5, 6, 7, 8], + segment_ids: set![5, 6, 7, 8], target_size: u64::MAX, }) ); @@ -325,7 +325,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 1, - segment_ids: vec![1, 2], + segment_ids: set![1, 2], target_size: u64::MAX, }) ); @@ -353,7 +353,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 2, - segment_ids: vec![2, 3], + segment_ids: set![2, 3], target_size: u64::MAX, }) ); @@ -368,7 +368,7 @@ mod tests { compactor.choose(&levels, &config), Choice::Merge(CompactionInput { dest_level: 3, - segment_ids: vec![2, 3], + segment_ids: set![2, 3], target_size: u64::MAX, }) ); diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index fcb41147..76247b9e 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -14,7 +14,7 @@ use crate::{ }, stop_signal::StopSignal, tree::inner::{SealedMemtables, TreeId}, - Config, HashSet, + Config, }; use std::{ sync::{atomic::AtomicU64, Arc, RwLock, RwLockWriteGuard}, @@ -138,9 +138,6 @@ fn merge_segments( payload .segment_ids .iter() - // NOTE: Throw away duplicate segment IDs - .collect::>() - .into_iter() .filter_map(|x| segments.get(x)) .cloned() .collect() @@ -163,7 +160,7 @@ fn merge_segments( let last_level = levels.last_level_index(); - levels.hide_segments(&payload.segment_ids); + levels.hide_segments(payload.segment_ids.iter().copied()); drop(levels); @@ -312,7 +309,7 @@ fn merge_segments( if let Err(e) = swap_result { // IMPORTANT: Show the segments again, because compaction failed - original_levels.show_segments(&payload.segment_ids); + original_levels.show_segments(payload.segment_ids.iter().copied()); return Err(e); }; @@ -348,7 +345,7 @@ fn merge_segments( .remove((opts.tree_id, *segment_id).into()); } - original_levels.show_segments(&payload.segment_ids); + original_levels.show_segments(payload.segment_ids.iter().copied()); drop(original_levels); diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 8eee53e3..9328c94e 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -2,7 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use crate::{key_range::KeyRange, segment::meta::SegmentId, Segment, UserKey}; +use crate::{key_range::KeyRange, segment::meta::SegmentId, HashSet, Segment, UserKey}; use std::{ops::Bound, sync::Arc}; /// Level of an LSM-tree @@ -47,7 +47,7 @@ impl Default for Level { } impl Level { - pub fn list_ids(&self) -> Vec { + pub fn list_ids(&self) -> HashSet { self.segments.iter().map(|x| x.metadata.id).collect() } diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 0e6af530..d0da032f 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -407,15 +407,15 @@ impl LevelManifest { output } - pub(crate) fn show_segments(&mut self, keys: &[SegmentId]) { + pub(crate) fn show_segments(&mut self, keys: impl Iterator) { for key in keys { - self.hidden_set.remove(key); + self.hidden_set.remove(&key); } } - pub(crate) fn hide_segments(&mut self, keys: &[SegmentId]) { + pub(crate) fn hide_segments(&mut self, keys: impl Iterator) { for key in keys { - self.hidden_set.insert(*key); + self.hidden_set.insert(key); } } } diff --git a/src/lib.rs b/src/lib.rs index 58c56633..de3f075a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,6 +103,13 @@ pub(crate) type HashMap = std::collections::HashMap; pub(crate) type HashSet = std::collections::HashSet; +#[allow(unused)] +macro_rules! set { + ($($x:expr),+ $(,)?) => { + [$($x),+].into_iter().collect::>() + } +} + macro_rules! fail_iter { ($e:expr) => { match $e { From 27e1d8633a3d1538e8adacf5c03621cc6526c2b0 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 17:21:29 +0100 Subject: [PATCH 28/40] fix: attempting bloom filter load even when ptr=0 --- src/compaction/fifo.rs | 2 +- src/compaction/leveled.rs | 2 +- src/compaction/maintenance.rs | 2 +- src/compaction/tiered.rs | 2 +- src/compaction/worker.rs | 24 +++++++++++---------- src/level_manifest/level.rs | 2 +- src/segment/mod.rs | 39 ++++++++++++++++++++--------------- src/segment/writer/mod.rs | 2 +- src/tree/mod.rs | 18 +++++++++------- 9 files changed, 52 insertions(+), 41 deletions(-) diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index f7131d5c..93fb8bf6 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -182,7 +182,7 @@ mod tests { block_cache, #[cfg(feature = "bloom")] - bloom_filter: BloomFilter::with_fp_rate(1, 0.1), + bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)), }) } diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index d91ce345..b9c4f1ee 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -392,7 +392,7 @@ mod tests { block_cache, #[cfg(feature = "bloom")] - bloom_filter: BloomFilter::with_fp_rate(1, 0.1), + bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)), }) } diff --git a/src/compaction/maintenance.rs b/src/compaction/maintenance.rs index 4a8e3703..1c83f09f 100644 --- a/src/compaction/maintenance.rs +++ b/src/compaction/maintenance.rs @@ -137,7 +137,7 @@ mod tests { block_cache, #[cfg(feature = "bloom")] - bloom_filter: BloomFilter::with_fp_rate(1, 0.1), + bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)), }) } diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 4024a2a3..253c1537 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -185,7 +185,7 @@ mod tests { block_cache, #[cfg(feature = "bloom")] - bloom_filter: BloomFilter::with_fp_rate(1, 0.1), + bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)), }) } diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 76247b9e..3eaee0fc 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -263,17 +263,19 @@ fn merge_segments( #[cfg(feature = "bloom")] bloom_filter: { - use crate::coding::Decode; - use std::{ - fs::File, - io::{Seek, SeekFrom}, - }; - - assert!(*bloom_ptr > 0, "can not find bloom filter block"); - - let mut reader = File::open(&segment_file_path)?; - reader.seek(SeekFrom::Start(*bloom_ptr))?; - BloomFilter::decode_from(&mut reader)? + if *bloom_ptr > 0 { + use crate::coding::Decode; + use std::{ + fs::File, + io::{Seek, SeekFrom}, + }; + + let mut reader = File::open(&segment_file_path)?; + reader.seek(SeekFrom::Start(*bloom_ptr))?; + Some(BloomFilter::decode_from(&mut reader)?) + } else { + None + } }, })) }) diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 9328c94e..df0ba5e4 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -277,7 +277,7 @@ mod tests { block_cache, #[cfg(feature = "bloom")] - bloom_filter: BloomFilter::with_fp_rate(1, 0.1), + bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)), }) } diff --git a/src/segment/mod.rs b/src/segment/mod.rs index d1534cda..1a2fa02b 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -68,7 +68,7 @@ pub struct Segment { /// Bloom filter #[cfg(feature = "bloom")] #[doc(hidden)] - pub bloom_filter: BloomFilter, + pub bloom_filter: Option, } impl std::fmt::Debug for Segment { @@ -193,17 +193,19 @@ impl Segment { // TODO: as Bloom method #[cfg(feature = "bloom")] bloom_filter: { - use crate::coding::Decode; - use std::{ - fs::File, - io::{Seek, SeekFrom}, - }; - - assert!(*bloom_ptr > 0, "can not find bloom filter block"); - - let mut reader = File::open(file_path)?; - reader.seek(SeekFrom::Start(*bloom_ptr))?; - BloomFilter::decode_from(&mut reader)? + if *bloom_ptr > 0 { + use crate::coding::Decode; + use std::{ + fs::File, + io::{Seek, SeekFrom}, + }; + + let mut reader = File::open(file_path)?; + reader.seek(SeekFrom::Start(*bloom_ptr))?; + Some(BloomFilter::decode_from(&mut reader)?) + } else { + None + } }, }) } @@ -212,7 +214,10 @@ impl Segment { #[must_use] /// Gets the bloom filter size pub fn bloom_filter_size(&self) -> usize { - self.bloom_filter.len() + self.bloom_filter + .as_ref() + .map(|x| x.len()) + .unwrap_or_default() } #[cfg(feature = "bloom")] @@ -232,8 +237,8 @@ impl Segment { return Ok(None); } - { - if !self.bloom_filter.contains_hash(hash) { + if let Some(bf) = &self.bloom_filter { + if !bf.contains_hash(hash) { return Ok(None); } } @@ -373,10 +378,10 @@ impl Segment { let key = key.as_ref(); #[cfg(feature = "bloom")] - { + if let Some(bf) = &self.bloom_filter { debug_assert!(false, "Use Segment::get_with_hash instead"); - if !self.bloom_filter.contains(key) { + if !bf.contains(key) { return Ok(None); } } diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index a4d7190c..6813791e 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -138,7 +138,7 @@ impl Writer { bloom_policy: BloomConstructionPolicy::default(), #[cfg(feature = "bloom")] - bloom_hash_buffer: Vec::with_capacity(10_000), + bloom_hash_buffer: Vec::new(), }) } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 9fc1d533..964521e6 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -54,7 +54,7 @@ impl AbstractTree for Tree { .read() .expect("lock is poisoned") .iter() - .map(|x| x.bloom_filter.len()) + .map(|x| x.bloom_filter_size()) .sum() } @@ -509,14 +509,18 @@ impl Tree { // TODO: as Bloom method #[cfg(feature = "bloom")] bloom_filter: { - use crate::coding::Decode; - use std::io::Seek; + if *bloom_ptr > 0 { + use crate::coding::Decode; + use std::io::Seek; - assert!(*bloom_ptr > 0, "can not find bloom filter block"); + assert!(*bloom_ptr > 0, "can not find bloom filter block"); - let mut reader = std::fs::File::open(&segment_file_path)?; - reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?; - BloomFilter::decode_from(&mut reader)? + let mut reader = std::fs::File::open(&segment_file_path)?; + reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?; + Some(BloomFilter::decode_from(&mut reader)?) + } else { + None + } }, } .into(); From 4b33d01c4aa5552d2e6011e2ade6462d20b6d718 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 22:04:59 +0100 Subject: [PATCH 29/40] use jemalloc in benches --- Cargo.toml | 1 + benches/block.rs | 7 +++++++ benches/tree.rs | 7 +++++++ 3 files changed, 15 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 6c087ead..2abef91a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ criterion = { version = "0.5.1", features = ["html_reports"] } fs_extra = "1.3.0" nanoid = "0.4.0" test-log = "0.2.16" +tikv-jemallocator = "0.5.4" [package.metadata.cargo-all-features] denylist = ["all"] diff --git a/benches/block.rs b/benches/block.rs index 58d6c23b..db92ee5e 100644 --- a/benches/block.rs +++ b/benches/block.rs @@ -10,6 +10,13 @@ use lsm_tree::{ }; use std::io::Write; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + /* fn value_block_size(c: &mut Criterion) { let mut group = c.benchmark_group("ValueBlock::size"); diff --git a/benches/tree.rs b/benches/tree.rs index 1929db2b..1b801067 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -3,6 +3,13 @@ use lsm_tree::{AbstractTree, BlockCache, Config}; use std::sync::Arc; use tempfile::tempdir; +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + fn full_scan(c: &mut Criterion) { let mut group = c.benchmark_group("scan all"); group.sample_size(10); From 611517b754bcc52b4c754c4b9e5480a5d9ed6735 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 22:15:57 +0100 Subject: [PATCH 30/40] wip --- src/compaction/fifo.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 93fb8bf6..4108d0c1 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -106,10 +106,8 @@ impl CompactionStrategy for Strategy { super::maintenance::Strategy.choose(levels, config) } } else { - let mut ids = segment_ids_to_delete.into_iter().collect::>(); - ids.sort_unstable(); - - Choice::Drop(ids.into_iter().collect::>()) + let ids = segment_ids_to_delete.into_iter().collect(); + Choice::Drop(ids) } } } From aa7778000dbcf7b7e8a3c984a0874e2176689482 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 22:16:35 +0100 Subject: [PATCH 31/40] wip --- src/tree/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 964521e6..2f1f2dec 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -513,8 +513,6 @@ impl Tree { use crate::coding::Decode; use std::io::Seek; - assert!(*bloom_ptr > 0, "can not find bloom filter block"); - let mut reader = std::fs::File::open(&segment_file_path)?; reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?; Some(BloomFilter::decode_from(&mut reader)?) From e939794e764fe0885030764abc914759e29da3fc Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 22:20:32 +0100 Subject: [PATCH 32/40] fix: msvc --- Cargo.toml | 4 +++- benches/block.rs | 2 +- benches/tree.rs | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2abef91a..5d8edac6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,9 @@ criterion = { version = "0.5.1", features = ["html_reports"] } fs_extra = "1.3.0" nanoid = "0.4.0" test-log = "0.2.16" -tikv-jemallocator = "0.5.4" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = { version = "0.5.4", optional = true } [package.metadata.cargo-all-features] denylist = ["all"] diff --git a/benches/block.rs b/benches/block.rs index db92ee5e..8b4af995 100644 --- a/benches/block.rs +++ b/benches/block.rs @@ -11,7 +11,7 @@ use lsm_tree::{ use std::io::Write; #[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; +use jemallocator::Jemalloc; #[cfg(not(target_env = "msvc"))] #[global_allocator] diff --git a/benches/tree.rs b/benches/tree.rs index 1b801067..c54a3869 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use tempfile::tempdir; #[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; +use jemallocator::Jemalloc; #[cfg(not(target_env = "msvc"))] #[global_allocator] From 4b77e5d610cee891ad61efa56a71faf36e062289 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sun, 27 Oct 2024 22:23:27 +0100 Subject: [PATCH 33/40] revert --- Cargo.toml | 3 --- benches/block.rs | 7 ------- benches/tree.rs | 7 ------- 3 files changed, 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d8edac6..6c087ead 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,9 +47,6 @@ fs_extra = "1.3.0" nanoid = "0.4.0" test-log = "0.2.16" -[target.'cfg(not(target_env = "msvc"))'.dependencies] -jemallocator = { version = "0.5.4", optional = true } - [package.metadata.cargo-all-features] denylist = ["all"] diff --git a/benches/block.rs b/benches/block.rs index 8b4af995..58d6c23b 100644 --- a/benches/block.rs +++ b/benches/block.rs @@ -10,13 +10,6 @@ use lsm_tree::{ }; use std::io::Write; -#[cfg(not(target_env = "msvc"))] -use jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - /* fn value_block_size(c: &mut Criterion) { let mut group = c.benchmark_group("ValueBlock::size"); diff --git a/benches/tree.rs b/benches/tree.rs index c54a3869..1929db2b 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -3,13 +3,6 @@ use lsm_tree::{AbstractTree, BlockCache, Config}; use std::sync::Arc; use tempfile::tempdir; -#[cfg(not(target_env = "msvc"))] -use jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - fn full_scan(c: &mut Criterion) { let mut group = c.benchmark_group("scan all"); group.sample_size(10); From c54b9c3f51ff2eed4c38559149f1af4fe2021c98 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 29 Oct 2024 01:49:44 +0100 Subject: [PATCH 34/40] fix: segment range reader not using passed CachePolicy --- src/segment/range.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/segment/range.rs b/src/segment/range.rs index bcfd893a..f5aff678 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -23,9 +23,7 @@ pub struct Range { pub(crate) range: (Bound, Bound), - reader: Reader, - - cache_policy: CachePolicy, + pub(crate) reader: Reader, } impl Range { @@ -53,15 +51,13 @@ impl Range { reader, range, - - cache_policy: CachePolicy::Write, } } /// Sets the cache policy #[must_use] pub fn cache_policy(mut self, policy: CachePolicy) -> Self { - self.cache_policy = policy; + self.reader = self.reader.cache_policy(policy); self } From c1d397afb1d030fff77706b80c0348fc6cb772df Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 30 Oct 2024 02:10:59 +0100 Subject: [PATCH 35/40] refactor --- src/blob_tree/gc/writer.rs | 7 ++----- src/blob_tree/mod.rs | 6 +++--- src/coding.rs | 9 ++++++--- src/level_manifest/mod.rs | 4 ++-- src/segment/file_offsets.rs | 4 ++-- src/segment/meta/compression.rs | 6 +++--- src/segment/meta/mod.rs | 2 +- 7 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/blob_tree/gc/writer.rs b/src/blob_tree/gc/writer.rs index f9f8f849..bbb8b424 100644 --- a/src/blob_tree/gc/writer.rs +++ b/src/blob_tree/gc/writer.rs @@ -38,15 +38,12 @@ impl<'a> value_log::IndexWriter for GcWriter<'a> { } fn finish(&mut self) -> std::io::Result<()> { - use std::io::{Error as IoError, ErrorKind as IoErrorKind}; - log::trace!("Finish blob GC index writer"); #[allow(clippy::significant_drop_in_scrutinee)] for (key, vhandle, size) in self.buffer.drain(..) { - let buf = MaybeInlineValue::Indirect { vhandle, size } - .encode_into_vec() - .map_err(|e| IoError::new(IoErrorKind::Other, e.to_string()))?; + // TODO: encode into slice using Slice::with_size... + let buf = MaybeInlineValue::Indirect { vhandle, size }.encode_into_vec(); self.memtable.insert(InternalValue::from_components( key, diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 9a234d66..2b39a8bd 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -342,7 +342,7 @@ impl AbstractTree for BlobTree { blob_writer.write(&item.key.user_key, value)?; } else { let direct = MaybeInlineValue::Inline(value); - let serialized_direct = direct.encode_into_vec()?; + let serialized_direct = direct.encode_into_vec(); segment_writer.write(InternalValue::new(item.key, serialized_direct))?; } } @@ -530,7 +530,7 @@ impl AbstractTree for BlobTree { // into inline or indirect values let item = MaybeInlineValue::Inline(value.as_ref().into()); - let value = item.encode_into_vec().expect("should serialize"); + let value = item.encode_into_vec(); let value = InternalValue::from_components(key.as_ref(), value, seqno, r#type); lock.insert(value) @@ -544,7 +544,7 @@ impl AbstractTree for BlobTree { // into inline or indirect values let item = MaybeInlineValue::Inline(value.as_ref().into()); - let value = item.encode_into_vec().expect("should serialize"); + let value = item.encode_into_vec(); self.index.insert(key, value, seqno) } diff --git a/src/coding.rs b/src/coding.rs index 42f29fac..82f822ed 100644 --- a/src/coding.rs +++ b/src/coding.rs @@ -79,10 +79,13 @@ pub trait Encode { fn encode_into(&self, writer: &mut W) -> Result<(), EncodeError>; /// Serializes into vector. - fn encode_into_vec(&self) -> Result, EncodeError> { + fn encode_into_vec(&self) -> Vec { let mut v = vec![]; - self.encode_into(&mut v)?; - Ok(v) + + #[allow(clippy::expected_used)] + self.encode_into(&mut v).expect("cannot fail"); + + v } } diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index d0da032f..eb4a8444 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -234,7 +234,7 @@ impl LevelManifest { log::trace!("Writing level manifest to {path:?}",); - let serialized = levels.encode_into_vec()?; + let serialized = levels.encode_into_vec(); // NOTE: Compaction threads don't have concurrent access to the level manifest // because it is behind a mutex @@ -501,7 +501,7 @@ mod tests { is_disjoint: false, }; - let bytes = manifest.deep_clone().encode_into_vec()?; + let bytes = manifest.deep_clone().encode_into_vec(); #[rustfmt::skip] let raw = &[ diff --git a/src/segment/file_offsets.rs b/src/segment/file_offsets.rs index 632d0688..17f14e24 100644 --- a/src/segment/file_offsets.rs +++ b/src/segment/file_offsets.rs @@ -85,7 +85,7 @@ mod tests { tli_ptr: BlockOffset(4), }; - let buf = before.encode_into_vec()?; + let buf = before.encode_into_vec(); let mut cursor = Cursor::new(buf); let after = FileOffsets::decode_from(&mut cursor)?; @@ -97,7 +97,7 @@ mod tests { #[test] fn file_offsets_serialized_len() -> crate::Result<()> { - let buf = FileOffsets::default().encode_into_vec()?; + let buf = FileOffsets::default().encode_into_vec(); assert_eq!(FileOffsets::serialized_len(), buf.len()); Ok(()) } diff --git a/src/segment/meta/compression.rs b/src/segment/meta/compression.rs index 590debb5..00413ceb 100644 --- a/src/segment/meta/compression.rs +++ b/src/segment/meta/compression.rs @@ -116,7 +116,7 @@ mod tests { #[test_log::test] fn compression_serialize_none() -> crate::Result<()> { - let serialized = CompressionType::None.encode_into_vec()?; + let serialized = CompressionType::None.encode_into_vec(); assert_eq!(2, serialized.len()); Ok(()) } @@ -127,7 +127,7 @@ mod tests { #[test_log::test] fn compression_serialize_none() -> crate::Result<()> { - let serialized = CompressionType::Lz4.encode_into_vec()?; + let serialized = CompressionType::Lz4.encode_into_vec(); assert_eq!(2, serialized.len()); Ok(()) } @@ -140,7 +140,7 @@ mod tests { #[test_log::test] fn compression_serialize_none() -> crate::Result<()> { for lvl in 0..10 { - let serialized = CompressionType::Miniz(lvl).encode_into_vec()?; + let serialized = CompressionType::Miniz(lvl).encode_into_vec(); assert_eq!(2, serialized.len()); } Ok(()) diff --git a/src/segment/meta/mod.rs b/src/segment/meta/mod.rs index 44e1d8cf..cf317a32 100644 --- a/src/segment/meta/mod.rs +++ b/src/segment/meta/mod.rs @@ -271,7 +271,7 @@ mod tests { seqnos: (0, 5), }; - let bytes = metadata.encode_into_vec()?; + let bytes = metadata.encode_into_vec(); let mut cursor = Cursor::new(bytes); let metadata_copy = Metadata::decode_from(&mut cursor)?; From a3a174ed9eff0755f671f793626d17f4ef3f5f57 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 30 Oct 2024 19:05:57 +0100 Subject: [PATCH 36/40] fix: blob flush + gc race condition that could drop new data --- src/blob_tree/gc/reader.rs | 5 +- src/blob_tree/mod.rs | 108 +++++++++++++++++++++++++++------ tests/blob_drop_after_flush.rs | 45 ++++++++++++++ 3 files changed, 139 insertions(+), 19 deletions(-) create mode 100644 tests/blob_drop_after_flush.rs diff --git a/src/blob_tree/gc/reader.rs b/src/blob_tree/gc/reader.rs index 17272d68..2ab94957 100644 --- a/src/blob_tree/gc/reader.rs +++ b/src/blob_tree/gc/reader.rs @@ -36,6 +36,7 @@ impl<'a> GcReader<'a> { impl<'a> value_log::IndexReader for GcReader<'a> { fn get(&self, key: &[u8]) -> std::io::Result> { use std::io::{Error as IoError, ErrorKind as IoErrorKind}; + use MaybeInlineValue::{Indirect, Inline}; let Some(item) = self .get_internal(key) @@ -45,8 +46,8 @@ impl<'a> value_log::IndexReader for GcReader<'a> { }; match item { - MaybeInlineValue::Inline(_) => Ok(None), - MaybeInlineValue::Indirect { vhandle, .. } => Ok(Some(vhandle)), + Inline(_) => Ok(None), + Indirect { vhandle, .. } => Ok(Some(vhandle)), } } } diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 2b39a8bd..4cf4d1b3 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -22,24 +22,31 @@ use index::IndexTree; use std::{ io::Cursor, ops::{RangeBounds, RangeFull}, - sync::{Arc, RwLockWriteGuard}, + sync::{atomic::AtomicUsize, Arc, RwLockWriteGuard}, }; use value::MaybeInlineValue; use value_log::ValueLog; fn resolve_value_handle(vlog: &ValueLog, item: RangeItem) -> RangeItem { + use MaybeInlineValue::{Indirect, Inline}; + match item { Ok((key, value)) => { let mut cursor = Cursor::new(value); - let item = MaybeInlineValue::decode_from(&mut cursor)?; - - match item { - MaybeInlineValue::Inline(bytes) => Ok((key, bytes)), - MaybeInlineValue::Indirect { vhandle, .. } => match vlog.get(&vhandle) { - Ok(Some(bytes)) => Ok((key, bytes)), - Err(e) => Err(e.into()), - _ => panic!("value handle did not match any blob - this is a bug"), - }, + + match MaybeInlineValue::decode_from(&mut cursor)? { + Inline(bytes) => Ok((key, bytes)), + Indirect { vhandle, .. } => { + // Resolve indirection using value log + match vlog.get(&vhandle) { + Ok(Some(bytes)) => Ok((key, bytes)), + Err(e) => Err(e.into()), + _ => { + // TODO: for non-snapshot ranges with periodic GC, this happened + panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key)) + } + } + } } } Err(e) => Err(e), @@ -62,6 +69,10 @@ pub struct BlobTree { /// Log-structured value-log that stores large values #[doc(hidden)] pub blobs: ValueLog, + + // TODO: maybe replace this with a nonce system + #[doc(hidden)] + pub pending_segments: Arc, } impl BlobTree { @@ -79,6 +90,7 @@ impl BlobTree { Ok(Self { index, blobs: ValueLog::open(vlog_path, vlog_cfg)?, + pending_segments: Arc::new(AtomicUsize::new(0)), }) } @@ -93,9 +105,27 @@ impl BlobTree { use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use MaybeInlineValue::{Indirect, Inline}; + while self + .pending_segments + .load(std::sync::atomic::Ordering::Acquire) + > 0 + { + // IMPORTANT: Busy wait until all segments in-flight are committed + // to the tree + } + // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable let _memtable_lock = self.index.read_lock_active_memtable(); + while self + .pending_segments + .load(std::sync::atomic::Ordering::Acquire) + > 0 + { + // IMPORTANT: Busy wait again until all segments in-flight are committed + // to the tree + } + let iter = self .index .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None); @@ -193,6 +223,10 @@ impl BlobTree { return Ok(None); }; + // IMPORTANT: Lock active memtable, so there cannot be a GC scan while + // we apply blob & segment files + let _lock = self.index.lock_active_memtable(); + let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)? else { return Ok(None); @@ -347,15 +381,41 @@ impl AbstractTree for BlobTree { } } + let _memtable_lock = self.lock_active_memtable(); + log::trace!("Register blob writer into value log"); self.blobs.register_writer(blob_writer)?; - log::trace!("Creating segment"); - self.index.consume_writer(segment_id, segment_writer) + log::trace!("Creating LSM-tree segment {segment_id}"); + let segment = self.index.consume_writer(segment_id, segment_writer)?; + + // TODO: this can probably solved in a nicer way + if segment.is_some() { + // IMPORTANT: Increment the pending count + // so there cannot be a GC scan now, until the segment is registered + self.pending_segments + .fetch_add(1, std::sync::atomic::Ordering::Release); + } + + Ok(segment) } fn register_segments(&self, segments: &[Arc]) -> crate::Result<()> { - self.index.register_segments(segments) + self.index.register_segments(segments)?; + + let count = self + .pending_segments + .load(std::sync::atomic::Ordering::Acquire); + + assert!( + count >= segments.len(), + "pending_segments is less than segments to register - this is a bug" + ); + + self.pending_segments + .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release); + + Ok(()) } fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Memtable> { @@ -556,7 +616,9 @@ impl AbstractTree for BlobTree { ) -> crate::Result> { use value::MaybeInlineValue::{Indirect, Inline}; - let Some(value) = self.index.get_internal_with_seqno(key.as_ref(), seqno)? else { + let key = key.as_ref(); + + let Some(value) = self.index.get_internal_with_seqno(key, seqno)? else { return Ok(None); }; @@ -564,7 +626,12 @@ impl AbstractTree for BlobTree { Inline(bytes) => Ok(Some(bytes)), Indirect { vhandle, .. } => { // Resolve indirection using value log - Ok(self.blobs.get(&vhandle)?.map(Slice::from)) + match self.blobs.get(&vhandle)? { + Some(bytes) => Ok(Some(bytes)), + None => { + panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug") + } + } } } } @@ -572,7 +639,9 @@ impl AbstractTree for BlobTree { fn get>(&self, key: K) -> crate::Result> { use value::MaybeInlineValue::{Indirect, Inline}; - let Some(value) = self.index.get_internal(key.as_ref())? else { + let key = key.as_ref(); + + let Some(value) = self.index.get_internal(key)? else { return Ok(None); }; @@ -580,7 +649,12 @@ impl AbstractTree for BlobTree { Inline(bytes) => Ok(Some(bytes)), Indirect { vhandle, .. } => { // Resolve indirection using value log - Ok(self.blobs.get(&vhandle)?.map(Slice::from)) + match self.blobs.get(&vhandle)? { + Some(bytes) => Ok(Some(bytes)), + None => { + panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(key)) + } + } } } } diff --git a/tests/blob_drop_after_flush.rs b/tests/blob_drop_after_flush.rs new file mode 100644 index 00000000..31e820a4 --- /dev/null +++ b/tests/blob_drop_after_flush.rs @@ -0,0 +1,45 @@ +use lsm_tree::{AbstractTree, Config}; +use std::time::Duration; + +#[test_log::test] +fn blob_drop_after_flush() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + + let tree = Config::new(&folder) + .compression(lsm_tree::CompressionType::None) + .open_as_blob_tree()?; + + tree.insert("a", "neptune".repeat(10_000), 0); + let (id, memtable) = tree.rotate_memtable().unwrap(); + + let segment = tree.flush_memtable(id, &memtable, 0).unwrap().unwrap(); + + // NOTE: Segment is now in-flight + + let gc_report = std::thread::spawn({ + let tree = tree.clone(); + + move || { + let report = tree.gc_scan_stats(1, 0)?; + Ok::<_, lsm_tree::Error>(report) + } + }); + + std::thread::sleep(Duration::from_secs(1)); + + let strategy = value_log::SpaceAmpStrategy::new(1.0); + tree.apply_gc_strategy(&strategy, 0)?; + + tree.register_segments(&[segment])?; + + assert_eq!( + "neptune".repeat(10_000).as_bytes(), + &*tree.get("a")?.unwrap(), + ); + + let report = gc_report.join().unwrap()?; + assert_eq!(0, report.stale_blobs); + assert_eq!(1, report.total_blobs); + + Ok(()) +} From 9339b4611162161c48430bc31c2e693c0198c525 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Wed, 30 Oct 2024 19:44:24 +0100 Subject: [PATCH 37/40] fix: deadlock --- src/blob_tree/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 4cf4d1b3..2bdb5d65 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -223,10 +223,6 @@ impl BlobTree { return Ok(None); }; - // IMPORTANT: Lock active memtable, so there cannot be a GC scan while - // we apply blob & segment files - let _lock = self.index.lock_active_memtable(); - let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)? else { return Ok(None); From 96e811ae8058bad64144912fcfc9999c866c5fbe Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 31 Oct 2024 17:41:45 +0100 Subject: [PATCH 38/40] test(leveled): intra L0 --- src/compaction/leveled.rs | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index b9c4f1ee..c0825486 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -479,6 +479,46 @@ mod tests { Ok(()) } + #[test] + #[allow( + clippy::cast_sign_loss, + clippy::cast_precision_loss, + clippy::cast_possible_truncation + )] + fn leveled_intra_l0() -> crate::Result<()> { + let tempdir = tempfile::tempdir()?; + let compactor = Strategy { + target_size: 64 * 1_024 * 1_024, + ..Default::default() + }; + + #[rustfmt::skip] + let mut levels = build_levels(tempdir.path(), vec![ + vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)], + vec![], + vec![], + vec![], + ])?; + + assert_eq!( + compactor.choose(&levels, &Config::default()), + Choice::Merge(CompactionInput { + dest_level: 0, + segment_ids: [1, 2, 3, 4].into_iter().collect::>(), + target_size: u64::from(compactor.target_size), + }) + ); + + levels.hide_segments(std::iter::once(4)); + + assert_eq!( + compactor.choose(&levels, &Config::default()), + Choice::DoNothing + ); + + Ok(()) + } + #[test] fn leveled_more_than_min_no_overlap() -> crate::Result<()> { let tempdir = tempfile::tempdir()?; From 2319d7d51628d2e33b0f768c0813e24f7cc8d96f Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 31 Oct 2024 17:41:54 +0100 Subject: [PATCH 39/40] refactor --- src/compaction/worker.rs | 19 +----------------- src/segment/file_offsets.rs | 3 +-- src/segment/mod.rs | 39 +++++++++++++++++++++---------------- src/segment/range.rs | 26 +++++++++++++++++-------- src/tree/mod.rs | 17 +--------------- 5 files changed, 43 insertions(+), 61 deletions(-) diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 3eaee0fc..f16cb2cf 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -21,9 +21,6 @@ use std::{ time::Instant, }; -#[cfg(feature = "bloom")] -use crate::bloom::BloomFilter; - #[cfg(feature = "bloom")] use crate::segment::writer::BloomConstructionPolicy; @@ -262,21 +259,7 @@ fn merge_segments( block_index, #[cfg(feature = "bloom")] - bloom_filter: { - if *bloom_ptr > 0 { - use crate::coding::Decode; - use std::{ - fs::File, - io::{Seek, SeekFrom}, - }; - - let mut reader = File::open(&segment_file_path)?; - reader.seek(SeekFrom::Start(*bloom_ptr))?; - Some(BloomFilter::decode_from(&mut reader)?) - } else { - None - } - }, + bloom_filter: Segment::load_bloom(&segment_file_path, bloom_ptr)?, })) }) .collect::>>()?; diff --git a/src/segment/file_offsets.rs b/src/segment/file_offsets.rs index 17f14e24..f25a1910 100644 --- a/src/segment/file_offsets.rs +++ b/src/segment/file_offsets.rs @@ -96,9 +96,8 @@ mod tests { } #[test] - fn file_offsets_serialized_len() -> crate::Result<()> { + fn file_offsets_serialized_len() { let buf = FileOffsets::default().encode_into_vec(); assert_eq!(FileOffsets::serialized_len(), buf.len()); - Ok(()) } } diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 1a2fa02b..d0d72229 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -151,6 +151,26 @@ impl Segment { Ok(broken_count) } + #[cfg(feature = "bloom")] + pub(crate) fn load_bloom>( + path: P, + ptr: value_block::BlockOffset, + ) -> crate::Result> { + Ok(if *ptr > 0 { + use crate::coding::Decode; + use std::{ + fs::File, + io::{Seek, SeekFrom}, + }; + + let mut reader = File::open(path)?; + reader.seek(SeekFrom::Start(*ptr))?; + Some(BloomFilter::decode_from(&mut reader)?) + } else { + None + }) + } + /// Tries to recover a segment from a file. pub(crate) fn recover>( file_path: P, @@ -190,23 +210,8 @@ impl Segment { block_index: Arc::new(block_index), block_cache, - // TODO: as Bloom method #[cfg(feature = "bloom")] - bloom_filter: { - if *bloom_ptr > 0 { - use crate::coding::Decode; - use std::{ - fs::File, - io::{Seek, SeekFrom}, - }; - - let mut reader = File::open(file_path)?; - reader.seek(SeekFrom::Start(*bloom_ptr))?; - Some(BloomFilter::decode_from(&mut reader)?) - } else { - None - } - }, + bloom_filter: Self::load_bloom(file_path, bloom_ptr)?, }) } @@ -216,7 +221,7 @@ impl Segment { pub fn bloom_filter_size(&self) -> usize { self.bloom_filter .as_ref() - .map(|x| x.len()) + .map(super::bloom::BloomFilter::len) .unwrap_or_default() } diff --git a/src/segment/range.rs b/src/segment/range.rs index f5aff678..af2c8378 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -61,9 +61,7 @@ impl Range { self } - fn initialize(&mut self) -> crate::Result<()> { - // TODO: can we skip searching for lower bound until next is called at least once...? - // would make short ranges 1.5-2x faster if only one direction is used + fn initialize_lo_bound(&mut self) -> crate::Result<()> { let start_key = match self.range.start_bound() { Bound::Unbounded => None, Bound::Included(start) | Bound::Excluded(start) => { @@ -77,9 +75,13 @@ impl Range { Some(start) } }; + if let Some(key) = start_key.cloned() { + self.reader.set_lower_bound(key); + } + Ok(()) + } - // TODO: can we skip searching for upper bound until next_back is called at least once...? - // would make short ranges 1.5-2x faster if only one direction is used + fn initialize_hi_bound(&mut self) -> crate::Result<()> { let end_key: Option<&Slice> = match self.range.end_bound() { Bound::Unbounded => { let upper_bound = self @@ -102,12 +104,20 @@ impl Range { } }; - if let Some(key) = start_key.cloned() { - self.reader.set_lower_bound(key); - } if let Some(key) = end_key.cloned() { self.reader.set_upper_bound(key); } + Ok(()) + } + + fn initialize(&mut self) -> crate::Result<()> { + // TODO: can we skip searching for lower bound until next is called at least once...? + // would make short ranges 1.5-2x faster (if cache miss) if only one direction is used + self.initialize_lo_bound()?; + + // TODO: can we skip searching for upper bound until next_back is called at least once...? + // would make short ranges 1.5-2x faster (if cache miss) if only one direction is used + self.initialize_hi_bound()?; self.is_initialized = true; diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 2f1f2dec..ebbe63dc 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -473,9 +473,6 @@ impl Tree { segment_id: SegmentId, mut writer: crate::segment::writer::Writer, ) -> crate::Result>> { - #[cfg(feature = "bloom")] - use crate::bloom::BloomFilter; - let segment_folder = writer.opts.folder.clone(); let segment_file_path = segment_folder.join(segment_id.to_string()); @@ -506,20 +503,8 @@ impl Tree { block_index, block_cache: self.config.block_cache.clone(), - // TODO: as Bloom method #[cfg(feature = "bloom")] - bloom_filter: { - if *bloom_ptr > 0 { - use crate::coding::Decode; - use std::io::Seek; - - let mut reader = std::fs::File::open(&segment_file_path)?; - reader.seek(std::io::SeekFrom::Start(*bloom_ptr))?; - Some(BloomFilter::decode_from(&mut reader)?) - } else { - None - } - }, + bloom_filter: Segment::load_bloom(&segment_file_path, bloom_ptr)?, } .into(); From 7fcf3f440b8c1035c62c6aadc960bcaa104426f5 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Thu, 31 Oct 2024 20:04:09 +0100 Subject: [PATCH 40/40] remove comment (fixed) --- src/blob_tree/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 2bdb5d65..37b04ea7 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -42,7 +42,6 @@ fn resolve_value_handle(vlog: &ValueLog, item: RangeItem) -> Range Ok(Some(bytes)) => Ok((key, bytes)), Err(e) => Err(e.into()), _ => { - // TODO: for non-snapshot ranges with periodic GC, this happened panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key)) } }