Skip to content

Commit

Permalink
Merge branch 'sharded-journal' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Dec 3, 2023
2 parents cc61127 + c287605 commit 5d13167
Show file tree
Hide file tree
Showing 33 changed files with 1,583 additions and 729 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ path = "src/lib.rs"
byteorder = "1.5.0"
chrono = "0.4.31"
crc32fast = "1.3.2"
crossbeam-skiplist = "0.1.1"
fs_extra = "1.3.0"
log = "0.4.20"
lz4_flex = "0.11.1"
min-max-heap = "1.3.0"
Expand Down
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ tree.flush()?;

This is the fastest and most feature-rich LSM-tree implementation in Rust! It features, among other things:

- Block based tables with LZ4 compression
- Prefix searching
- Range searching
- Size-tiered or Levelled compaction with concurrency support
- Partitioned block index to reduce memory footprint and keep startup time minimal [1]
- Block caching to keep hot data in memory
Expand All @@ -39,10 +42,14 @@ This is the fastest and most feature-rich LSM-tree implementation in Rust! It fe
- Automatic background compaction
- Does not spawn background threads unless actually needed
- Thread-safe (internally synchronized)
- LZ4-compresses data
- CRChecks data blocks
- 100% safe Rust

## Future

- Snapshots
- Reverse iteration
- Range tombstones

## Benchmarks

Testing system:
Expand Down
13 changes: 8 additions & 5 deletions benches/lsmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use tempfile::tempdir;
fn memtable_point_reads(c: &mut Criterion) {
let mut group = c.benchmark_group("memtable point reads");

let tree = Config::new(tempdir().unwrap()).open().unwrap();
let tree = Config::new(tempdir().unwrap())
.max_memtable_size(128_000_000)
.open()
.unwrap();

let max = 1_000_000;
let lookup_count = 100_000;
Expand Down Expand Up @@ -284,7 +287,7 @@ fn scan_vs_query(c: &mut Criterion) {
assert_eq!(iter.count(), 1000);
})
});
group.bench_function(format!("query rev {}", size), |b| {
/* group.bench_function(format!("query rev {}", size), |b| {
b.iter(|| {
let iter = tree
.range((
Expand All @@ -295,7 +298,7 @@ fn scan_vs_query(c: &mut Criterion) {
let iter = iter.into_iter();
assert_eq!(iter.rev().count(), 1000);
})
});
}); */
}
}

Expand Down Expand Up @@ -340,13 +343,13 @@ fn scan_vs_prefix(c: &mut Criterion) {
assert_eq!(iter.count(), 1000);
});
});
group.bench_function(format!("prefix rev {}", size), |b| {
/* group.bench_function(format!("prefix rev {}", size), |b| {
b.iter(|| {
let iter = tree.prefix(prefix).unwrap();
let iter = iter.into_iter();
assert_eq!(iter.rev().count(), 1000);
});
});
}); */
}
}

Expand Down
29 changes: 18 additions & 11 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use log::trace;

use crate::{Tree, Value};

/// An atomic write batch
Expand Down Expand Up @@ -35,8 +33,10 @@ impl Batch {
///
/// Will return `Err` if an IO error occurs
pub fn commit(mut self) -> crate::Result<()> {
let mut commit_log = self.tree.commit_log.lock().expect("lock is poisoned");
let mut memtable = self.tree.active_memtable.write().expect("lock is poisoned");
let mut shard = self.tree.journal.lock_shard();

// NOTE: Fully (write) lock, so the batch can be committed atomically
let memtable_lock = self.tree.active_memtable.write().expect("lock poisoned");

let batch_seqno = self
.tree
Expand All @@ -47,18 +47,25 @@ impl Batch {
item.seqno = batch_seqno;
}

let bytes_written = commit_log.append_batch(self.data.clone())?;
commit_log.flush()?;
let bytes_written = shard.write_batch(self.data.clone())?;
shard.flush()?;

memtable.size_in_bytes += bytes_written as u32;
let memtable_size = self
.tree
.approx_memtable_size_bytes
.fetch_add(bytes_written as u32, std::sync::atomic::Ordering::SeqCst);

trace!("Applying {} batched items to memtable", self.data.len());
log::trace!("Applying {} batched items to memtable", self.data.len());
for entry in std::mem::take(&mut self.data) {
memtable.insert(entry, 0);
memtable_lock.insert(entry);
}

if memtable.exceeds_threshold(self.tree.config.max_memtable_size) {
crate::flush::start(&self.tree, commit_log, memtable)?;
drop(memtable_lock);
drop(shard);

if memtable_size > self.tree.config.max_memtable_size {
log::debug!("Memtable reached threshold size");
crate::flush::start(&self.tree)?;
}

Ok(())
Expand Down
7 changes: 6 additions & 1 deletion src/compaction/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,17 @@ mod tests {
levels::Levels,
segment::{index::MetaIndex, meta::Metadata, Segment},
};
use std::sync::Arc;
use std::{
fs::File,
io::BufReader,
sync::{Arc, Mutex},
};

fn fixture_segment(id: String) -> Arc<Segment> {
let block_cache = Arc::new(BlockCache::new(0));

Arc::new(Segment {
file: Mutex::new(BufReader::new(File::open("Cargo.toml").unwrap())),
block_index: Arc::new(MetaIndex::new(id.clone(), block_cache.clone())),
metadata: Metadata {
path: ".".into(),
Expand Down
18 changes: 15 additions & 3 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
Tree,
};
use std::{
sync::{Arc, RwLockWriteGuard},
fs::File,
io::BufReader,
sync::{Arc, Mutex, RwLockWriteGuard},
time::Instant,
};

Expand Down Expand Up @@ -35,17 +37,22 @@ pub(crate) fn do_compaction(
};

MergeIterator::from_segments(&to_merge)?
.evict_old_versions(false /* TODO: evict if there are no open snapshots */)
};

segments_lock.hide_segments(&payload.segment_ids);
drop(segments_lock);
log::trace!("Freed segment lock");

// NOTE: Only evict tombstones when reaching the last level,
// That way we don't resurrect data beneath the tombstone
let should_evict_tombstones = payload.dest_level == (tree.config.levels - 1);

let mut segment_writer = MultiWriter::new(
payload.target_size,
crate::segment::writer::Options {
block_size: tree.config.block_size,
evict_tombstones: payload.dest_level == (tree.config.levels - 1),
evict_tombstones: should_evict_tombstones,
path: tree.path().join("segments"),
},
)?;
Expand All @@ -67,6 +74,7 @@ pub(crate) fn do_compaction(
let path = metadata.path.clone();

Ok(Segment {
file: Mutex::new(BufReader::new(File::open(path.join("blocks"))?)),
metadata,
block_cache: Arc::clone(&tree.block_cache),
block_index: MetaIndex::from_file(segment_id, path, Arc::clone(&tree.block_cache))?
Expand Down Expand Up @@ -98,12 +106,16 @@ pub(crate) fn do_compaction(
segments_lock.remove(key);
}

// NOTE: This is really important
// Write the segment with the removed segments first
// Otherwise the folder is deleted, but the segment is still referenced!
segments_lock.write_to_disk()?;

for key in &payload.segment_ids {
log::trace!("rm -rf segment folder {}", key);
std::fs::remove_dir_all(tree.path().join("segments").join(key))?;
}

segments_lock.write_to_disk()?;
segments_lock.show_segments(&payload.segment_ids);

drop(memtable_lock);
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Default for Config {
path: ".lsm.data".into(),
block_size: 4_096,
block_cache_size: 1_024,
max_memtable_size: 128 * 1_024 * 1_024,
max_memtable_size: 64 * 1_024 * 1_024,
levels: 7,
compaction_strategy: Arc::new(tiered::Strategy::default()),
flush_threads: 4,
Expand Down
13 changes: 4 additions & 9 deletions src/disk_block.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use crate::serde::{Deserializable, DeserializeError, Serializable, SerializeError};
use byteorder::{BigEndian, ReadBytesExt};
use lz4_flex::decompress_size_prepended;
use std::{
fs::File,
io::{BufReader, Cursor, Read, Seek, Write},
path::Path,
};
use std::io::{Cursor, Read, Write};

/// Contains the items of a block after decompressing & deserializing.
///
Expand All @@ -28,15 +24,14 @@ impl<T: Clone + Serializable + Deserializable> DiskBlock<T> {
Ok(block)
}

pub fn from_file_compressed<P: AsRef<Path>>(
path: P,
pub fn from_file_compressed<R: std::io::Read + std::io::Seek>(
reader: &mut R,
offset: u64,
size: u32,
) -> crate::Result<Self> {
// Read bytes from disk
let mut reader = BufReader::new(File::open(path)?);
reader.seek(std::io::SeekFrom::Start(offset))?;
Self::from_reader_compressed(&mut reader, size)
Self::from_reader_compressed(reader, size)
}
}

Expand Down
Loading

0 comments on commit 5d13167

Please sign in to comment.