From 1ad333ee9fd00fca0f063c091502072dcbe11252 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sat, 4 Nov 2023 21:40:47 +0100 Subject: [PATCH] fix tests --- README.md | 4 ++-- src/flush_writer.rs | 10 ++++++---- src/fptree/inner.rs | 22 +++++++++++----------- src/fptree/leaf_manager/mod.rs | 12 +++++------- src/fptree/mod.rs | 4 ++-- src/fptree_manager.rs | 13 ++++++------- src/kvs.rs | 21 +++++++++++++++++---- src/util/file_util.rs | 21 +++++++++------------ tests/kvs_test.rs | 4 ++-- 9 files changed, 60 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index a1ed8d5..d7a18c4 100644 --- a/README.md +++ b/README.md @@ -35,8 +35,8 @@ An embedded key-value store # Threshold for flushing Basically, the number of keys triggers flushing (Converting FPTree to SSTable) because the number of the root split times on the FPTree is the threshold. -Where you insert K keys and the N split happens, the relation between K and N would be: +Where you insert K keys and the N split happens, the relation between $K$ and $N$ would be: ```math -8 \times (3^{n+1} + 1) +K = 8 \times (3^{N+1} + 1) ``` If you set the `root_split_threshold` to 6, flush would happen every 17,504 insertions. diff --git a/src/flush_writer.rs b/src/flush_writer.rs index 5268b03..9cebb1c 100644 --- a/src/flush_writer.rs +++ b/src/flush_writer.rs @@ -1,6 +1,6 @@ use bloomfilter::Bloom; use crossbeam_channel::Receiver; -use log::debug; +use log::{debug, trace}; use mockall_double::double; use std::fs::File; use std::io::{BufWriter, Write}; @@ -23,6 +23,7 @@ const WRITE_BUFFER_SIZE: usize = 1 << 18; #[derive(Debug, Clone)] pub enum FlushSignal { TryFlush, + Shutdown, } pub fn spawn_flush_writer( @@ -42,6 +43,7 @@ pub fn spawn_flush_writer( fptree_manager.switch_fptree().unwrap(); } } + FlushSignal::Shutdown => break, } } }) @@ -68,12 +70,12 @@ impl FlushWriter { first_leaf: Arc>, ) -> Result<(usize, Bloom>, SparseIndex), std::io::Error> { debug!( - "Starting flush: table {}, table ID {}", + "Starting flush FPTree of {} to SSTable ID {}", self.name, self.table_id ); let leaf_manager = first_leaf.read().unwrap().get_leaf_manager(); let id_list = leaf_manager.read().unwrap().get_leaf_id_chain(); - debug!("leaf ID list: {:?}", id_list); + trace!("leaf ID list: {:?}", id_list); self.flush_kv(leaf_manager, id_list) } @@ -94,7 +96,7 @@ impl FlushWriter { fn create_new_table(&mut self) -> Result<(usize, File), std::io::Error> { let id = self.table_id; let table_file_path = self.config.get_table_file_path(&self.name, id); - let table_file = File::create(&table_file_path)?; + let table_file = File::create(table_file_path)?; // odd ID used by compactions self.table_id += 2; diff --git a/src/fptree/inner.rs b/src/fptree/inner.rs index 3193a76..b376322 100644 --- a/src/fptree/inner.rs +++ b/src/fptree/inner.rs @@ -65,7 +65,7 @@ impl Node for Inner { let child = self.get_child(key).unwrap(); let new_child = child.read().unwrap().get_next().unwrap(); - match self.keys.binary_search(&inserted_key) { + match self.keys.binary_search(inserted_key) { Ok(_) => panic!("should not reach here"), Err(i) => { self.keys.insert(i, inserted_key.clone()); @@ -97,11 +97,11 @@ impl Node for Inner { let new_children = self.children.split_off((FANOUT + 1) / 2 + 1); let mut new_inner = Inner::new(); - for i in 1..new_keys.len() { - new_inner.add_key(&new_keys[i]); + for new_key in new_keys.into_iter().skip(1) { + new_inner.add_key(new_key); } - for i in 0..new_children.len() { - new_inner.add_child(new_children[i].clone()); + for new_child in new_children { + new_inner.add_child(new_child.clone()); } if let Some(next) = self.get_next() { new_inner.next = Some(next.clone()); @@ -134,8 +134,8 @@ impl Inner { self.keys.len() > FANOUT } - pub fn add_key(&mut self, key: &Vec) { - self.keys.push(key.clone()); + pub fn add_key(&mut self, key: Vec) { + self.keys.push(key); } pub fn add_child(&mut self, child: Arc>) { @@ -211,15 +211,15 @@ mod tests { #[test] fn test_get_child() { let mut inner = Inner::new(); - inner.add_key(&vec![10 as u8]); + inner.add_key(vec![10 as u8]); let mut new_child1 = Inner::new(); - new_child1.add_key(&vec![1 as u8]); + new_child1.add_key(vec![1 as u8]); let arc_new_child1: Arc> = Arc::new(RwLock::new(new_child1)); inner.add_child(arc_new_child1.clone()); let mut new_child2 = Inner::new(); - new_child2.add_key(&vec![11 as u8]); + new_child2.add_key(vec![11 as u8]); let arc_new_child2: Arc> = Arc::new(RwLock::new(new_child2)); inner.add_child(arc_new_child2.clone()); @@ -235,7 +235,7 @@ mod tests { assert_eq!(inner.need_split(), false); for i in 0..(FANOUT + 1) { - inner.add_key(&vec![i as u8]); + inner.add_key(vec![i as u8]); } assert_eq!(inner.need_split(), true); } diff --git a/src/fptree/leaf_manager/mod.rs b/src/fptree/leaf_manager/mod.rs index 2c04516..f3e3559 100644 --- a/src/fptree/leaf_manager/mod.rs +++ b/src/fptree/leaf_manager/mod.rs @@ -32,9 +32,8 @@ pub struct LeafManager { impl LeafManager { pub fn new(name: &str, id: usize, config: &Config) -> Result { let data_dir = config.get_leaf_dir_path(name); - match std::fs::create_dir_all(&data_dir) { - Ok(_) => (), - Err(e) => panic!("{} - {}", &data_dir, e), + if let Err(e) = std::fs::create_dir_all(&data_dir) { + unreachable!("Creating {} failed: {}", data_dir, e); } let file_path = config.get_leaf_file_path(name, id); @@ -46,7 +45,7 @@ impl LeafManager { }; if !is_created { - debug!("recovering headers for FPTree {}", id); + debug!("Recovering headers for FPTree {}", id); manager.recover_state()?; } @@ -145,9 +144,7 @@ impl LeafManager { }; encoded.extend(&data_util::calc_crc(&encoded).to_le_bytes()); mmap.copy_from_slice(&encoded); - mmap.flush()?; - - Ok(()) + mmap.flush() } pub fn read_data( @@ -228,6 +225,7 @@ impl LeafManager { // validate the header let magic = u32::from_le_bytes(mmap[0..LEN_HEADER_MAGIC].try_into().unwrap()); if magic != HEADER_MAGIC { + warn!("Header magic was not found"); self.free_leaves.push_back(id); continue; } diff --git a/src/fptree/mod.rs b/src/fptree/mod.rs index 76146ee..9d9da6a 100644 --- a/src/fptree/mod.rs +++ b/src/fptree/mod.rs @@ -52,7 +52,7 @@ impl FPTree { fn split_root( &self, - key: &Vec, + key: Vec, mut locked_root: RwLockWriteGuard>>, locked_new_child: Arc>, ) { @@ -107,7 +107,7 @@ impl FPTree { if locked_node.is_root() { locked_node.set_root(false); let new_child = locked_node.get_next().unwrap(); - self.split_root(&inserted, locked_root, new_child); + self.split_root(inserted, locked_root, new_child); return Ok(()); } } else { diff --git a/src/fptree_manager.rs b/src/fptree_manager.rs index 83cb574..30c8f23 100644 --- a/src/fptree_manager.rs +++ b/src/fptree_manager.rs @@ -124,19 +124,18 @@ impl FPTreeManager { let mut locked_new = self.new_fptree_ptr.write().unwrap(); match &*locked_new { Some(n) => { - let leaf_file = self - .config - .get_leaf_file_path(&self.name, *locked_fptree_id); - std::fs::remove_file(leaf_file)?; - *self.fptree_ptr.write().unwrap() = n.clone(); - *locked_new = None; + let deleted_id = *locked_fptree_id; *locked_fptree_id += 1; + *locked_new = None; + + let leaf_file = self.config.get_leaf_file_path(&self.name, deleted_id); + std::fs::remove_file(leaf_file)?; } None => unreachable!("No new FPTree when flushing"), } - info!("completed flushing FPTree {}", *locked_fptree_id - 1); + info!("Completed flushing FPTree {}", *locked_fptree_id - 1); Ok(()) } diff --git a/src/kvs.rs b/src/kvs.rs index 1abb865..db3cecf 100644 --- a/src/kvs.rs +++ b/src/kvs.rs @@ -1,5 +1,5 @@ use crossbeam_channel::Sender; -use log::{debug, info, trace}; +use log::{debug, error, info, trace}; use std::path::Path; use std::sync::Arc; use std::thread::JoinHandle; @@ -14,7 +14,7 @@ use crate::util::file_util; pub struct KVS { fptree_manager: Arc, sstable_manager: Arc, - flush_writer_handle: JoinHandle<()>, + flush_writer_handle: Option>, sender: Sender, } @@ -49,11 +49,11 @@ impl KVS { fptree_manager.clone(), sstable_manager.clone(), ); - info!("Amphis has started: table {}", name); + info!("Amphis KVS has started: table {}", name); Ok(KVS { fptree_manager, sstable_manager, - flush_writer_handle, + flush_writer_handle: Some(flush_writer_handle), sender: tx, }) } @@ -107,3 +107,16 @@ impl KVS { self.fptree_manager.delete(key) } } + +impl Drop for KVS { + fn drop(&mut self) { + info!("Wait for the flushing for shutting down..."); + let _ = self.sender.send(FlushSignal::Shutdown); + if let Some(handle) = self.flush_writer_handle.take() { + if let Err(e) = handle.join() { + error!("FlushWrite failed to shut down: {e:?}"); + } + } + info!("Shutdown gracefully"); + } +} diff --git a/src/util/file_util.rs b/src/util/file_util.rs index ec2c57c..caa8a09 100644 --- a/src/util/file_util.rs +++ b/src/util/file_util.rs @@ -1,7 +1,8 @@ -use log::warn; +use log::info; use std::fs::{File, OpenOptions}; use std::io::ErrorKind; use std::path::Path; +use std::str::FromStr; pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> { let mut is_created = false; @@ -20,7 +21,7 @@ pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> { .create(true) .open(&file_path)?; f.sync_all()?; - warn!("New file {} is created", file_path); + info!("New file {} is created", file_path); is_created = true; f } @@ -40,14 +41,10 @@ pub fn get_tree_id(path: &Path) -> Option { } fn get_id(path: &Path, prefix: &str) -> Option { - match path.file_stem().expect("cannot get the file name").to_str() { - Some(file) if file.starts_with(prefix) => match file.strip_prefix(prefix) { - Some(id_str) => match id_str.parse::() { - Ok(id) => return Some(id), - Err(_) => return None, - }, - None => None, - }, - _ => None, - } + let file = path + .file_stem() + .expect("cannot get the file name") + .to_str()?; + file.strip_prefix(prefix) + .and_then(|id| usize::from_str(id).ok()) } diff --git a/tests/kvs_test.rs b/tests/kvs_test.rs index a529c40..246b2e1 100644 --- a/tests/kvs_test.rs +++ b/tests/kvs_test.rs @@ -95,8 +95,8 @@ fn test_recovery() { #[test] fn concurrent_insert() { let _ = env_logger::builder().is_test(true).try_init(); - const NUM_INSERTION: usize = 1025; - const NUM_THREADS: usize = 4; + const NUM_INSERTION: usize = 2345; + const NUM_THREADS: usize = 8; const TABLE_NAME: &str = "concurrency_test"; let config = Config::new(); let kvs = Arc::new(amphis::kvs::KVS::new(TABLE_NAME, config).expect("failed to start Amphis"));