Skip to content

Commit

Permalink
configure flush threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed Oct 1, 2023
1 parent 6c37373 commit 118565b
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 51 deletions.
14 changes: 6 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ version = "0.1.0"
authors = ["yito88 <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bincode = "1.3.1"
bloomfilter = "1.0.3"
config = "0.11"
cfg-if = "0.1.10"
crc = "1.8.1"
crossbeam-channel = "0.5.8"
env_logger = "0.7.1"
log = "0.4.11"
memmap = "0.7.0"
bincode = "1.3.1"
mockall_double = "0.2.0"
serde = { version = "1.0.115", features = ["derive"] }
crc = "1.8.1"
thiserror = "1.0.20"
log = "0.4.11"
cfg-if = "0.1.10"
env_logger = "0.7.1"
mockall_double = "0.2.0"

[dev-dependencies]
mockall = "0.7.2"
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,11 @@ An embedded key-value store
- Others
- [ ] Error handling
- [ ] Backgound thread

# Threshold for flushing
Basically, the number of keys triggers flushing (Converting FPTree to SSTable) because the number of the root split times on the FPTree is the threshold.
Where you insert K keys and the N split happens, the relation between K and N would be:
```math
8 \times (3^{n+1} + 1)
```
If you set the `root_split_threshold` to 6, flush would happen every 17,504 insertions.
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
leaf_dir = 'data'
table_dir = 'data'
root_split_threshold = 4
bloom_filter_items_count = 8192
bloom_filter_fp_rate = 0.01
23 changes: 9 additions & 14 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const CONFIG_FILE: &str = "config.toml";
pub struct Config {
leaf_dir: String,
table_dir: String,
root_split_threshold: usize,
bloom_filter_items_count: usize,
bloom_filter_fp_rate: f64,
}
Expand All @@ -16,6 +17,7 @@ impl Default for Config {
Self {
leaf_dir: "data".to_owned(),
table_dir: "data".to_owned(),
root_split_threshold: 6,
bloom_filter_items_count: 8192,
bloom_filter_fp_rate: 0.01,
}
Expand All @@ -25,20 +27,6 @@ impl Default for Config {
impl Config {
pub fn new() -> Self {
let mut config = config::Config::default();

config
.set_default("leaf_dir", "data")
.expect("cannot parse the key");
config
.set_default("table_dir", "data")
.expect("cannot parse the key");
config
.set_default("bloom_filter_items_count", 8192)
.expect("cannot parse the key");
config
.set_default("bloom_filter_fp_rate", 0.01)
.expect("cannot parse the key");

if Path::new(CONFIG_FILE).exists() {
config
.merge(config::File::with_name(CONFIG_FILE))
Expand All @@ -59,6 +47,9 @@ impl Config {
config
.set_default("table_dir", path_str)
.expect("cannot parse the key");
config
.set_default("root_split_threshold", 6)
.expect("cannot parse the key");
config
.set_default("bloom_filter_items_count", 8192)
.expect("cannot parse the key");
Expand All @@ -85,6 +76,10 @@ impl Config {
format!("{}/sstable-{}.amph", self.get_table_dir_path(name), id)
}

pub fn get_root_split_threshold(&self) -> usize {
self.root_split_threshold
}

pub fn get_filter_items_count(&self) -> usize {
self.bloom_filter_items_count
}
Expand Down
6 changes: 3 additions & 3 deletions src/fptree/inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::{debug, trace};
use log::trace;
use std::sync::Arc;
use std::sync::RwLock;

Expand Down Expand Up @@ -40,7 +40,7 @@ impl Node for Inner {
}

fn get_child(&self, key: &Vec<u8>) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
debug!("check an inner - {} by key {:?}", self, key);
trace!("check an inner - {} by key {:?}", self, key);
let child_idx = match self.keys.binary_search(key) {
Ok(i) => i + 1,
Err(i) => i,
Expand Down Expand Up @@ -108,7 +108,7 @@ impl Node for Inner {
}
trace!("split existing inner: {}", self);
trace!("new inner: {}", new_inner);
debug!("split_key: {:?}", split_key.clone());
trace!("split_key: {:?}", split_key.clone());
self.next = Some(Arc::new(RwLock::new(new_inner)));

Ok(split_key)
Expand Down
4 changes: 2 additions & 2 deletions src/fptree/leaf.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::{debug, trace};
use log::trace;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Node for Leaf {
self.next = Some(Arc::new(RwLock::new(new_leaf)));

trace!("split existing leaf: {}", self);
debug!("split_key: {:?}", split_key.clone());
trace!("split_key: {:?}", split_key.clone());

Ok(split_key)
}
Expand Down
18 changes: 7 additions & 11 deletions src/fptree_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use std::sync::{Arc, RwLock};
use crate::config::Config;
use crate::fptree::{FPTree, Leaf};

// TODO: parameterize
const SPLIT_THRESHOLD: usize = 6;

pub struct FPTreeManager {
name: String,
config: Config,
Expand All @@ -32,18 +29,16 @@ impl FPTreeManager {
}

pub fn need_flush(&self) -> bool {
let new_fptree = self.new_fptree_ptr.read().unwrap();
let count = match &*new_fptree {
Some(new) => new.read().unwrap().get_root_split_count(),
None => self
// Flush has been already started when the new FPTree exists
self.new_fptree_ptr.read().unwrap().is_none()
&& self
.fptree_ptr
.read()
.unwrap()
.read()
.unwrap()
.get_root_split_count(),
};
count > SPLIT_THRESHOLD
.get_root_split_count()
>= self.config.get_root_split_threshold()
}

pub fn put(&self, key: &Vec<u8>, value: &Vec<u8>) -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -88,6 +83,7 @@ impl FPTreeManager {
}
}

/// Check the triggered flush before starting flush and set the new FPTree
pub fn prepare_flush(&self) -> Result<Option<Arc<RwLock<Leaf>>>, std::io::Error> {
let locked_fptree_id = self.fptree_id.write().unwrap();

Expand Down Expand Up @@ -137,7 +133,7 @@ impl FPTreeManager {
*locked_new = None;
*locked_fptree_id += 1;
}
None => panic!("no new FPTree after a flush"),
None => unreachable!("No new FPTree when flushing"),
}

info!("completed flushing FPTree {}", *locked_fptree_id - 1);
Expand Down
1 change: 0 additions & 1 deletion src/kvs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl KVS {
if Path::new(&path).exists() {
// flush the exsting trees
for entry in std::fs::read_dir(path)? {
println!("DEBUG: {:?}", entry);
if let Some(fptree_id) = file_util::get_tree_id(&entry?.path()) {
debug!("found FPTree ID: {}", fptree_id);
let (table_id, filter, index) =
Expand Down
9 changes: 5 additions & 4 deletions src/sstable_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bloomfilter::Bloom;
use log::debug;
use log::{debug, trace};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs::File;
Expand Down Expand Up @@ -80,15 +80,16 @@ impl SstableManager {

pub fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
for (table_id, filter) in self.filters.read().unwrap().iter().rev() {
debug!(
trace!(
"Check the bloom filter of SSTable {} with {:?}",
table_id, key
table_id,
key
);
if !filter.check(key) {
continue;
}

debug!("Read from SSTable {} with {:?}", table_id, key);
trace!("Read from SSTable {} with {:?}", table_id, key);
let indexes = self.indexes.read().unwrap();
let index = indexes.get(&table_id).unwrap();
let offset = index.get(key);
Expand Down
10 changes: 7 additions & 3 deletions src/util/file_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> {
let file = match OpenOptions::new()
.read(true)
.append(true)
.create(true)
.create(false)
.open(&file_path)
{
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let f = File::create(&file_path)?;
let f = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&file_path)?;
f.sync_all()?;
warn!("a new file {} is created", file_path);
warn!("New file {} is created", file_path);
is_created = true;
f
}
Expand Down
6 changes: 1 addition & 5 deletions tests/kvs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,7 @@ fn concurrent_insert() {
match each.get(&key.as_bytes().to_vec()).unwrap() {
Some(value) => {
let actual = String::from_utf8(value.to_vec()).unwrap();

match expected {
_ if expected == actual => println!("Get result {}", actual),
_ => panic!("expected: {}, actual: {}", expected, actual),
}
assert_eq!(actual, expected);
}
None => panic!("expected: {}, actual: None", expected),
};
Expand Down

0 comments on commit 118565b

Please sign in to comment.