Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed Nov 4, 2023
1 parent 118565b commit 1ad333e
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 51 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ An embedded key-value store

# Threshold for flushing
Basically, the number of keys triggers flushing (Converting FPTree to SSTable) because the number of the root split times on the FPTree is the threshold.
Where you insert K keys and the N split happens, the relation between K and N would be:
Where you insert K keys and the N split happens, the relation between $K$ and $N$ would be:
```math
8 \times (3^{n+1} + 1)
K = 8 \times (3^{N+1} + 1)
```
If you set the `root_split_threshold` to 6, flush would happen every 17,504 insertions.
10 changes: 6 additions & 4 deletions src/flush_writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bloomfilter::Bloom;
use crossbeam_channel::Receiver;
use log::debug;
use log::{debug, trace};
use mockall_double::double;
use std::fs::File;
use std::io::{BufWriter, Write};
Expand All @@ -23,6 +23,7 @@ const WRITE_BUFFER_SIZE: usize = 1 << 18;
#[derive(Debug, Clone)]
pub enum FlushSignal {
TryFlush,
Shutdown,
}

pub fn spawn_flush_writer(
Expand All @@ -42,6 +43,7 @@ pub fn spawn_flush_writer(
fptree_manager.switch_fptree().unwrap();
}
}
FlushSignal::Shutdown => break,
}
}
})
Expand All @@ -68,12 +70,12 @@ impl FlushWriter {
first_leaf: Arc<RwLock<Leaf>>,
) -> Result<(usize, Bloom<Vec<u8>>, SparseIndex), std::io::Error> {
debug!(
"Starting flush: table {}, table ID {}",
"Starting flush FPTree of {} to SSTable ID {}",
self.name, self.table_id
);
let leaf_manager = first_leaf.read().unwrap().get_leaf_manager();
let id_list = leaf_manager.read().unwrap().get_leaf_id_chain();
debug!("leaf ID list: {:?}", id_list);
trace!("leaf ID list: {:?}", id_list);

self.flush_kv(leaf_manager, id_list)
}
Expand All @@ -94,7 +96,7 @@ impl FlushWriter {
fn create_new_table(&mut self) -> Result<(usize, File), std::io::Error> {
let id = self.table_id;
let table_file_path = self.config.get_table_file_path(&self.name, id);
let table_file = File::create(&table_file_path)?;
let table_file = File::create(table_file_path)?;

// odd ID used by compactions
self.table_id += 2;
Expand Down
22 changes: 11 additions & 11 deletions src/fptree/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Node for Inner {
let child = self.get_child(key).unwrap();
let new_child = child.read().unwrap().get_next().unwrap();

match self.keys.binary_search(&inserted_key) {
match self.keys.binary_search(inserted_key) {
Ok(_) => panic!("should not reach here"),
Err(i) => {
self.keys.insert(i, inserted_key.clone());
Expand Down Expand Up @@ -97,11 +97,11 @@ impl Node for Inner {
let new_children = self.children.split_off((FANOUT + 1) / 2 + 1);

let mut new_inner = Inner::new();
for i in 1..new_keys.len() {
new_inner.add_key(&new_keys[i]);
for new_key in new_keys.into_iter().skip(1) {
new_inner.add_key(new_key);
}
for i in 0..new_children.len() {
new_inner.add_child(new_children[i].clone());
for new_child in new_children {
new_inner.add_child(new_child.clone());
}
if let Some(next) = self.get_next() {
new_inner.next = Some(next.clone());
Expand Down Expand Up @@ -134,8 +134,8 @@ impl Inner {
self.keys.len() > FANOUT
}

pub fn add_key(&mut self, key: &Vec<u8>) {
self.keys.push(key.clone());
pub fn add_key(&mut self, key: Vec<u8>) {
self.keys.push(key);
}

pub fn add_child(&mut self, child: Arc<RwLock<dyn Node + Send + Sync>>) {
Expand Down Expand Up @@ -211,15 +211,15 @@ mod tests {
#[test]
fn test_get_child() {
let mut inner = Inner::new();
inner.add_key(&vec![10 as u8]);
inner.add_key(vec![10 as u8]);

let mut new_child1 = Inner::new();
new_child1.add_key(&vec![1 as u8]);
new_child1.add_key(vec![1 as u8]);
let arc_new_child1: Arc<RwLock<dyn Node + Send + Sync>> = Arc::new(RwLock::new(new_child1));
inner.add_child(arc_new_child1.clone());

let mut new_child2 = Inner::new();
new_child2.add_key(&vec![11 as u8]);
new_child2.add_key(vec![11 as u8]);
let arc_new_child2: Arc<RwLock<dyn Node + Send + Sync>> = Arc::new(RwLock::new(new_child2));
inner.add_child(arc_new_child2.clone());

Expand All @@ -235,7 +235,7 @@ mod tests {
assert_eq!(inner.need_split(), false);

for i in 0..(FANOUT + 1) {
inner.add_key(&vec![i as u8]);
inner.add_key(vec![i as u8]);
}
assert_eq!(inner.need_split(), true);
}
Expand Down
12 changes: 5 additions & 7 deletions src/fptree/leaf_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ pub struct LeafManager {
impl LeafManager {
pub fn new(name: &str, id: usize, config: &Config) -> Result<Self, std::io::Error> {
let data_dir = config.get_leaf_dir_path(name);
match std::fs::create_dir_all(&data_dir) {
Ok(_) => (),
Err(e) => panic!("{} - {}", &data_dir, e),
if let Err(e) = std::fs::create_dir_all(&data_dir) {
unreachable!("Creating {} failed: {}", data_dir, e);
}

let file_path = config.get_leaf_file_path(name, id);
Expand All @@ -46,7 +45,7 @@ impl LeafManager {
};

if !is_created {
debug!("recovering headers for FPTree {}", id);
debug!("Recovering headers for FPTree {}", id);
manager.recover_state()?;
}

Expand Down Expand Up @@ -145,9 +144,7 @@ impl LeafManager {
};
encoded.extend(&data_util::calc_crc(&encoded).to_le_bytes());
mmap.copy_from_slice(&encoded);
mmap.flush()?;

Ok(())
mmap.flush()
}

pub fn read_data(
Expand Down Expand Up @@ -228,6 +225,7 @@ impl LeafManager {
// validate the header
let magic = u32::from_le_bytes(mmap[0..LEN_HEADER_MAGIC].try_into().unwrap());
if magic != HEADER_MAGIC {
warn!("Header magic was not found");
self.free_leaves.push_back(id);
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions src/fptree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl FPTree {

fn split_root(
&self,
key: &Vec<u8>,
key: Vec<u8>,
mut locked_root: RwLockWriteGuard<Arc<RwLock<dyn Node + Send + Sync>>>,
locked_new_child: Arc<RwLock<dyn Node + Send + Sync>>,
) {
Expand Down Expand Up @@ -107,7 +107,7 @@ impl FPTree {
if locked_node.is_root() {
locked_node.set_root(false);
let new_child = locked_node.get_next().unwrap();
self.split_root(&inserted, locked_root, new_child);
self.split_root(inserted, locked_root, new_child);
return Ok(());
}
} else {
Expand Down
13 changes: 6 additions & 7 deletions src/fptree_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,18 @@ impl FPTreeManager {
let mut locked_new = self.new_fptree_ptr.write().unwrap();
match &*locked_new {
Some(n) => {
let leaf_file = self
.config
.get_leaf_file_path(&self.name, *locked_fptree_id);
std::fs::remove_file(leaf_file)?;

*self.fptree_ptr.write().unwrap() = n.clone();
*locked_new = None;
let deleted_id = *locked_fptree_id;
*locked_fptree_id += 1;
*locked_new = None;

let leaf_file = self.config.get_leaf_file_path(&self.name, deleted_id);
std::fs::remove_file(leaf_file)?;
}
None => unreachable!("No new FPTree when flushing"),
}

info!("completed flushing FPTree {}", *locked_fptree_id - 1);
info!("Completed flushing FPTree {}", *locked_fptree_id - 1);

Ok(())
}
Expand Down
21 changes: 17 additions & 4 deletions src/kvs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crossbeam_channel::Sender;
use log::{debug, info, trace};
use log::{debug, error, info, trace};
use std::path::Path;
use std::sync::Arc;
use std::thread::JoinHandle;
Expand All @@ -14,7 +14,7 @@ use crate::util::file_util;
pub struct KVS {
fptree_manager: Arc<FPTreeManager>,
sstable_manager: Arc<SstableManager>,
flush_writer_handle: JoinHandle<()>,
flush_writer_handle: Option<JoinHandle<()>>,
sender: Sender<FlushSignal>,
}

Expand Down Expand Up @@ -49,11 +49,11 @@ impl KVS {
fptree_manager.clone(),
sstable_manager.clone(),
);
info!("Amphis has started: table {}", name);
info!("Amphis KVS has started: table {}", name);
Ok(KVS {
fptree_manager,
sstable_manager,
flush_writer_handle,
flush_writer_handle: Some(flush_writer_handle),
sender: tx,
})
}
Expand Down Expand Up @@ -107,3 +107,16 @@ impl KVS {
self.fptree_manager.delete(key)
}
}

impl Drop for KVS {
fn drop(&mut self) {
info!("Wait for the flushing for shutting down...");
let _ = self.sender.send(FlushSignal::Shutdown);
if let Some(handle) = self.flush_writer_handle.take() {
if let Err(e) = handle.join() {
error!("FlushWrite failed to shut down: {e:?}");
}
}
info!("Shutdown gracefully");
}
}
21 changes: 9 additions & 12 deletions src/util/file_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use log::warn;
use log::info;
use std::fs::{File, OpenOptions};
use std::io::ErrorKind;
use std::path::Path;
use std::str::FromStr;

pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> {
let mut is_created = false;
Expand All @@ -20,7 +21,7 @@ pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> {
.create(true)
.open(&file_path)?;
f.sync_all()?;
warn!("New file {} is created", file_path);
info!("New file {} is created", file_path);
is_created = true;
f
}
Expand All @@ -40,14 +41,10 @@ pub fn get_tree_id(path: &Path) -> Option<usize> {
}

fn get_id(path: &Path, prefix: &str) -> Option<usize> {
match path.file_stem().expect("cannot get the file name").to_str() {
Some(file) if file.starts_with(prefix) => match file.strip_prefix(prefix) {
Some(id_str) => match id_str.parse::<usize>() {
Ok(id) => return Some(id),
Err(_) => return None,
},
None => None,
},
_ => None,
}
let file = path
.file_stem()
.expect("cannot get the file name")
.to_str()?;
file.strip_prefix(prefix)
.and_then(|id| usize::from_str(id).ok())
}
4 changes: 2 additions & 2 deletions tests/kvs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ fn test_recovery() {
#[test]
fn concurrent_insert() {
let _ = env_logger::builder().is_test(true).try_init();
const NUM_INSERTION: usize = 1025;
const NUM_THREADS: usize = 4;
const NUM_INSERTION: usize = 2345;
const NUM_THREADS: usize = 8;
const TABLE_NAME: &str = "concurrency_test";
let config = Config::new();
let kvs = Arc::new(amphis::kvs::KVS::new(TABLE_NAME, config).expect("failed to start Amphis"));
Expand Down

0 comments on commit 1ad333e

Please sign in to comment.