Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed Nov 4, 2023
1 parent 1ad333e commit 40e70cc
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 94 deletions.
25 changes: 16 additions & 9 deletions .github/workflows/amphis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@ env:

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- uses: actions/cache@v2
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- uses: Swatinem/rust-cache@v2

- name: Build
run: cargo build --verbose

- name: Run tests
run: cargo test --verbose

fmt-clippy:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- uses: Swatinem/rust-cache@v2

- name: Build
run: cargo fmt --all -- --check

- name: Run tests
run: cargo clippy -- -D warnings
3 changes: 1 addition & 2 deletions src/flush_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ impl FlushWriter {
// it is enough to sort only kv_pairs since all leaves are ordered
kv_pairs.sort();
for (key, value) in kv_pairs {
writer.write(&data_util::format_data_with_crc(&key, &value))?;
filter.set(&key);
index.insert(&key, offset);

offset += data_util::get_data_size(key.len(), value.len());
writer.write_all(&data_util::format_data_with_crc(&key, &value))?;
}
}
table_file.sync_all()?;
Expand Down
22 changes: 11 additions & 11 deletions src/fptree/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ impl Node for Inner {
}
}

fn get_child(&self, key: &Vec<u8>) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
fn get_child(&self, key: &[u8]) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
trace!("check an inner - {} by key {:?}", self, key);
let child_idx = match self.keys.binary_search(key) {
let child_idx = match self.keys.binary_search(&key.to_vec()) {
Ok(i) => i + 1,
Err(i) => i,
};
Expand All @@ -58,17 +58,17 @@ impl Node for Inner {

fn insert(
&mut self,
key: &Vec<u8>,
inserted_key: &Vec<u8>,
key: &[u8],
inserted_key: &[u8],
) -> Result<Option<Vec<u8>>, std::io::Error> {
let mut ret: Option<Vec<u8>> = None;
let child = self.get_child(key).unwrap();
let new_child = child.read().unwrap().get_next().unwrap();

match self.keys.binary_search(inserted_key) {
match self.keys.binary_search(&inserted_key.to_vec()) {
Ok(_) => panic!("should not reach here"),
Err(i) => {
self.keys.insert(i, inserted_key.clone());
self.keys.insert(i, inserted_key.to_vec());
if i + 1 >= self.children.len() {
self.children.push(new_child.clone());
} else {
Expand All @@ -84,7 +84,7 @@ impl Node for Inner {
Ok(ret)
}

fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
match self.get_child(key) {
Some(c) => c.read().unwrap().get(key),
None => Ok(None),
Expand Down Expand Up @@ -162,20 +162,20 @@ mod tests {
fn get_next(&self) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
Some(Arc::new(RwLock::new(MockLeaf { val: 0 })))
}
fn get_child(&self, _key: &Vec<u8>) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
fn get_child(&self, _key: &[u8]) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
None
}
fn insert(
&mut self,
_key: &Vec<u8>,
_value: &Vec<u8>,
_key: &[u8],
_value: &[u8],
) -> Result<Option<Vec<u8>>, std::io::Error> {
Ok(None)
}
fn may_need_split(&self) -> bool {
false
}
fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
if *key == "key".as_bytes().to_vec() {
Ok(Some(format!("value{}", self.val).as_bytes().to_vec()))
} else {
Expand Down
28 changes: 13 additions & 15 deletions src/fptree/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ cfg_if::cfg_if! {
use super::leaf_manager::{LeafHeader, INITIAL_TAIL_OFFSET, NUM_SLOT};
use super::node::Node;

type KvPair = (Vec<u8>, Vec<u8>, usize);

pub struct Leaf {
leaf_manager: Arc<RwLock<LeafManager>>,
header: LeafHeader,
Expand Down Expand Up @@ -42,27 +44,23 @@ impl Node for Leaf {
}
}

fn get_child(&self, _key: &Vec<u8>) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
fn get_child(&self, _key: &[u8]) -> Option<Arc<RwLock<dyn Node + Send + Sync>>> {
None
}

fn may_need_split(&self) -> bool {
self.header.need_split()
}

fn insert(
&mut self,
key: &Vec<u8>,
value: &Vec<u8>,
) -> Result<Option<Vec<u8>>, std::io::Error> {
fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
let mut ret: Option<Vec<u8>> = None;

self.invalidate_data(key)?;

if self.header.need_split() {
let split_key = self.split()?;
let new_leaf = self.get_next().expect("no next leaf");
if split_key < *key {
if split_key.as_slice() < key {
self.commit()?;

// TODO: when the new leaf is split
Expand Down Expand Up @@ -100,7 +98,7 @@ impl Node for Leaf {
Ok(ret)
}

fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
trace!("Read from Leaf: {}", self);
for slot in self.get_existing_slots(key) {
let (page_id, data_offset, key_size, value_size) = self.header.get_kv_info(slot);
Expand Down Expand Up @@ -175,8 +173,8 @@ impl Leaf {
self.leaf_manager.clone()
}

pub fn get_kv_pairs(&self) -> Result<Vec<(Vec<u8>, Vec<u8>, usize)>, std::io::Error> {
let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>, usize)> = Vec::with_capacity(NUM_SLOT);
pub fn get_kv_pairs(&self) -> Result<Vec<KvPair>, std::io::Error> {
let mut kv_pairs: Vec<KvPair> = Vec::with_capacity(NUM_SLOT);

for slot in 0..NUM_SLOT {
if self.header.is_slot_set(slot) {
Expand All @@ -195,7 +193,7 @@ impl Leaf {
Ok(kv_pairs)
}

fn calc_key_hash(&self, key: &Vec<u8>) -> u8 {
fn calc_key_hash(&self, key: &[u8]) -> u8 {
let mut hasher = DefaultHasher::new();
for b in key {
hasher.write_u8(*b);
Expand All @@ -204,7 +202,7 @@ impl Leaf {
hasher.finish() as u8
}

fn get_existing_slots(&self, key: &Vec<u8>) -> Vec<usize> {
fn get_existing_slots(&self, key: &[u8]) -> Vec<usize> {
let mut slots = Vec::new();
let hash = self.calc_key_hash(key);
for (slot, fp) in self.header.get_fingerprints().iter().enumerate() {
Expand All @@ -216,7 +214,7 @@ impl Leaf {
slots
}

fn invalidate_data(&mut self, key: &Vec<u8>) -> Result<(), std::io::Error> {
fn invalidate_data(&mut self, key: &[u8]) -> Result<(), std::io::Error> {
for slot in self.get_existing_slots(key) {
let (page_id, data_offset, key_size, value_size) = self.header.get_kv_info(slot);
let (actual_key, _value) = self.leaf_manager.read().unwrap().read_data(
Expand All @@ -238,8 +236,8 @@ impl Leaf {
&mut self,
slot: usize,
tail_offset: usize,
key: &Vec<u8>,
value: &Vec<u8>,
key: &[u8],
value: &[u8],
) {
let offset = self.header.get_tail_offset();
self.header.set_slot(slot);
Expand Down
8 changes: 4 additions & 4 deletions src/fptree/leaf_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl LeafManager {
.expect("the appended header doesn't exist");
}
last_header.set_ext(new_id);
self.commit_header(last_id, &&last_header)?;
self.commit_header(last_id, &last_header)?;

Ok(new_id)
}
Expand Down Expand Up @@ -181,8 +181,8 @@ impl LeafManager {
&self,
id: usize,
offset: usize,
key: &Vec<u8>,
value: &Vec<u8>,
key: &[u8],
value: &[u8],
) -> Result<Option<usize>, std::io::Error> {
let data_size = data_util::get_data_size(key.len(), value.len());
let aligned_tail = offset + data_util::round_up_size(data_size);
Expand All @@ -197,7 +197,7 @@ impl LeafManager {
.map_mut(&self.leaves_file)?
};

let data = data_util::format_data_with_crc(&key, &value);
let data = data_util::format_data_with_crc(key, value);
mmap.copy_from_slice(&data);
mmap.flush()?;

Expand Down
19 changes: 9 additions & 10 deletions src/fptree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ impl FPTree {

fn split_root(
&self,
key: Vec<u8>,
key: &[u8],
mut locked_root: RwLockWriteGuard<Arc<RwLock<dyn Node + Send + Sync>>>,
locked_new_child: Arc<RwLock<dyn Node + Send + Sync>>,
) {
debug!("Root split: {:?}", key);
let mut new_root = Inner::new();
new_root.set_root(true);
new_root.add_key(key);
new_root.add_key(key.to_vec());
new_root.add_child(locked_root.clone());
new_root.add_child(locked_new_child.clone());
*locked_root = Arc::new(RwLock::new(new_root));
Expand All @@ -68,7 +68,7 @@ impl FPTree {
*count += 1;
}

pub fn put(&self, key: &Vec<u8>, value: &Vec<u8>) -> Result<(), std::io::Error> {
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> {
// Lock the pointer to the root since it might be updated
let locked_root = self.root_ptr.write().unwrap();

Expand All @@ -88,8 +88,7 @@ impl FPTree {

let mut locked_nodes = Vec::new();
let mut is_root_locked = true;
for i in 0..nodes.len() {
let locked_node = nodes[i].write().unwrap();
for locked_node in nodes.iter().map(|node| node.write().unwrap()) {
if !locked_node.may_need_split() {
is_root_locked = false;
locked_nodes.clear();
Expand All @@ -99,15 +98,15 @@ impl FPTree {
drop(lock);

// Phase2: Insert split keys and a value
let mut inserted = value.clone();
let mut inserted = value.to_vec();
if is_root_locked {
while let Some(mut locked_node) = locked_nodes.pop() {
if let Some(split_key) = locked_node.insert(key, &inserted)? {
inserted = split_key.clone();
inserted = split_key;
if locked_node.is_root() {
locked_node.set_root(false);
let new_child = locked_node.get_next().unwrap();
self.split_root(inserted, locked_root, new_child);
self.split_root(&inserted, locked_root, new_child);
return Ok(());
}
} else {
Expand All @@ -128,7 +127,7 @@ impl FPTree {
Ok(())
}

pub fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
let mut node = self.root_ptr.read().unwrap().clone();
loop {
let n = node.clone();
Expand All @@ -141,7 +140,7 @@ impl FPTree {
}
}

pub fn delete(&self, key: &Vec<u8>) -> Result<(), std::io::Error> {
pub fn delete(&self, key: &[u8]) -> Result<(), std::io::Error> {
// just add a tombstone
self.put(key, &Vec::new())
}
Expand Down
7 changes: 3 additions & 4 deletions src/fptree/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ pub trait Node {
fn set_root(&mut self, is_root: bool);
fn is_leaf(&self) -> bool;
fn get_next(&self) -> Option<Arc<RwLock<dyn Node + Send + Sync>>>;
fn get_child(&self, key: &Vec<u8>) -> Option<Arc<RwLock<dyn Node + Send + Sync>>>;
fn get_child(&self, key: &[u8]) -> Option<Arc<RwLock<dyn Node + Send + Sync>>>;
fn may_need_split(&self) -> bool;
fn insert(&mut self, key: &Vec<u8>, value: &Vec<u8>)
-> Result<Option<Vec<u8>>, std::io::Error>;
fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error>;
fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error>;
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error>;
fn split(&mut self) -> Result<Vec<u8>, std::io::Error>;
fn commit(&self) -> Result<(), std::io::Error>;
}
8 changes: 4 additions & 4 deletions src/fptree_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FPTreeManager {
>= self.config.get_root_split_threshold()
}

pub fn put(&self, key: &Vec<u8>, value: &Vec<u8>) -> Result<(), std::io::Error> {
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> {
let locked_new = self.new_fptree_ptr.read().unwrap();
match &*locked_new {
Some(n) => n.read().unwrap().put(key, value),
Expand All @@ -57,7 +57,7 @@ impl FPTreeManager {
}
}

pub fn get(&self, key: &Vec<u8>) -> Result<Option<Vec<u8>>, std::io::Error> {
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
let mut result = None;
// TODO: concurrenct read
let locked_new = self.new_fptree_ptr.read().unwrap();
Expand All @@ -72,7 +72,7 @@ impl FPTreeManager {
Ok(result)
}

pub fn delete(&self, key: &Vec<u8>) -> Result<(), std::io::Error> {
pub fn delete(&self, key: &[u8]) -> Result<(), std::io::Error> {
let locked_new = self.new_fptree_ptr.read().unwrap();
match &*locked_new {
Some(n) => n.read().unwrap().delete(key),
Expand All @@ -93,7 +93,7 @@ impl FPTreeManager {
}

let mut locked_new = self.new_fptree_ptr.write().unwrap();
if let Some(_) = &*locked_new {
if (*locked_new).is_some() {
// The flush is in progress
return Ok(None);
}
Expand Down
Loading

0 comments on commit 40e70cc

Please sign in to comment.