diff --git a/.github/workflows/amphis.yml b/.github/workflows/amphis.yml index 02f3e98..9c8f208 100644 --- a/.github/workflows/amphis.yml +++ b/.github/workflows/amphis.yml @@ -7,22 +7,29 @@ env: jobs: build: - runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - - uses: actions/cache@v2 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - uses: Swatinem/rust-cache@v2 - name: Build run: cargo build --verbose - name: Run tests run: cargo test --verbose + + fmt-clippy: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: Swatinem/rust-cache@v2 + + - name: Build + run: cargo fmt --all -- --check + + - name: Run tests + run: cargo clippy -- -D warnings diff --git a/src/flush_writer.rs b/src/flush_writer.rs index 9cebb1c..c114335 100644 --- a/src/flush_writer.rs +++ b/src/flush_writer.rs @@ -139,11 +139,10 @@ impl FlushWriter { // it is enough to sort only kv_pairs since all leaves are ordered kv_pairs.sort(); for (key, value) in kv_pairs { - writer.write(&data_util::format_data_with_crc(&key, &value))?; filter.set(&key); index.insert(&key, offset); - offset += data_util::get_data_size(key.len(), value.len()); + writer.write_all(&data_util::format_data_with_crc(&key, &value))?; } } table_file.sync_all()?; diff --git a/src/fptree/inner.rs b/src/fptree/inner.rs index b376322..69f191c 100644 --- a/src/fptree/inner.rs +++ b/src/fptree/inner.rs @@ -39,9 +39,9 @@ impl Node for Inner { } } - fn get_child(&self, key: &Vec) -> Option>> { + fn get_child(&self, key: &[u8]) -> Option>> { trace!("check an inner - {} by key {:?}", self, key); - let child_idx = match self.keys.binary_search(key) { + let child_idx = match self.keys.binary_search(&key.to_vec()) { Ok(i) => i + 1, Err(i) => i, }; @@ -58,17 +58,17 @@ impl Node for Inner { fn insert( &mut self, - key: &Vec, - inserted_key: &Vec, + key: &[u8], + inserted_key: &[u8], ) -> Result>, std::io::Error> { let mut ret: Option> = None; let child = self.get_child(key).unwrap(); let new_child = child.read().unwrap().get_next().unwrap(); - match self.keys.binary_search(inserted_key) { + match self.keys.binary_search(&inserted_key.to_vec()) { Ok(_) => panic!("should not reach here"), Err(i) => { - self.keys.insert(i, inserted_key.clone()); + self.keys.insert(i, inserted_key.to_vec()); if i + 1 >= self.children.len() { self.children.push(new_child.clone()); } else { @@ -84,7 +84,7 @@ impl Node for Inner { Ok(ret) } - fn get(&self, key: &Vec) -> Result>, std::io::Error> { + fn get(&self, key: &[u8]) -> Result>, std::io::Error> { match self.get_child(key) { Some(c) => c.read().unwrap().get(key), None => Ok(None), @@ -162,20 +162,20 @@ mod tests { fn get_next(&self) -> Option>> { Some(Arc::new(RwLock::new(MockLeaf { val: 0 }))) } - fn get_child(&self, _key: &Vec) -> Option>> { + fn get_child(&self, _key: &[u8]) -> Option>> { None } fn insert( &mut self, - _key: &Vec, - _value: &Vec, + _key: &[u8], + _value: &[u8], ) -> Result>, std::io::Error> { Ok(None) } fn may_need_split(&self) -> bool { false } - fn get(&self, key: &Vec) -> Result>, std::io::Error> { + fn get(&self, key: &[u8]) -> Result>, std::io::Error> { if *key == "key".as_bytes().to_vec() { Ok(Some(format!("value{}", self.val).as_bytes().to_vec())) } else { diff --git a/src/fptree/leaf.rs b/src/fptree/leaf.rs index 010435b..4d148e3 100644 --- a/src/fptree/leaf.rs +++ b/src/fptree/leaf.rs @@ -13,6 +13,8 @@ cfg_if::cfg_if! { use super::leaf_manager::{LeafHeader, INITIAL_TAIL_OFFSET, NUM_SLOT}; use super::node::Node; +type KvPair = (Vec, Vec, usize); + pub struct Leaf { leaf_manager: Arc>, header: LeafHeader, @@ -42,7 +44,7 @@ impl Node for Leaf { } } - fn get_child(&self, _key: &Vec) -> Option>> { + fn get_child(&self, _key: &[u8]) -> Option>> { None } @@ -50,11 +52,7 @@ impl Node for Leaf { self.header.need_split() } - fn insert( - &mut self, - key: &Vec, - value: &Vec, - ) -> Result>, std::io::Error> { + fn insert(&mut self, key: &[u8], value: &[u8]) -> Result>, std::io::Error> { let mut ret: Option> = None; self.invalidate_data(key)?; @@ -62,7 +60,7 @@ impl Node for Leaf { if self.header.need_split() { let split_key = self.split()?; let new_leaf = self.get_next().expect("no next leaf"); - if split_key < *key { + if split_key.as_slice() < key { self.commit()?; // TODO: when the new leaf is split @@ -100,7 +98,7 @@ impl Node for Leaf { Ok(ret) } - fn get(&self, key: &Vec) -> Result>, std::io::Error> { + fn get(&self, key: &[u8]) -> Result>, std::io::Error> { trace!("Read from Leaf: {}", self); for slot in self.get_existing_slots(key) { let (page_id, data_offset, key_size, value_size) = self.header.get_kv_info(slot); @@ -175,8 +173,8 @@ impl Leaf { self.leaf_manager.clone() } - pub fn get_kv_pairs(&self) -> Result, Vec, usize)>, std::io::Error> { - let mut kv_pairs: Vec<(Vec, Vec, usize)> = Vec::with_capacity(NUM_SLOT); + pub fn get_kv_pairs(&self) -> Result, std::io::Error> { + let mut kv_pairs: Vec = Vec::with_capacity(NUM_SLOT); for slot in 0..NUM_SLOT { if self.header.is_slot_set(slot) { @@ -195,7 +193,7 @@ impl Leaf { Ok(kv_pairs) } - fn calc_key_hash(&self, key: &Vec) -> u8 { + fn calc_key_hash(&self, key: &[u8]) -> u8 { let mut hasher = DefaultHasher::new(); for b in key { hasher.write_u8(*b); @@ -204,7 +202,7 @@ impl Leaf { hasher.finish() as u8 } - fn get_existing_slots(&self, key: &Vec) -> Vec { + fn get_existing_slots(&self, key: &[u8]) -> Vec { let mut slots = Vec::new(); let hash = self.calc_key_hash(key); for (slot, fp) in self.header.get_fingerprints().iter().enumerate() { @@ -216,7 +214,7 @@ impl Leaf { slots } - fn invalidate_data(&mut self, key: &Vec) -> Result<(), std::io::Error> { + fn invalidate_data(&mut self, key: &[u8]) -> Result<(), std::io::Error> { for slot in self.get_existing_slots(key) { let (page_id, data_offset, key_size, value_size) = self.header.get_kv_info(slot); let (actual_key, _value) = self.leaf_manager.read().unwrap().read_data( @@ -238,8 +236,8 @@ impl Leaf { &mut self, slot: usize, tail_offset: usize, - key: &Vec, - value: &Vec, + key: &[u8], + value: &[u8], ) { let offset = self.header.get_tail_offset(); self.header.set_slot(slot); diff --git a/src/fptree/leaf_manager/mod.rs b/src/fptree/leaf_manager/mod.rs index f3e3559..d0ed392 100644 --- a/src/fptree/leaf_manager/mod.rs +++ b/src/fptree/leaf_manager/mod.rs @@ -85,7 +85,7 @@ impl LeafManager { .expect("the appended header doesn't exist"); } last_header.set_ext(new_id); - self.commit_header(last_id, &&last_header)?; + self.commit_header(last_id, &last_header)?; Ok(new_id) } @@ -181,8 +181,8 @@ impl LeafManager { &self, id: usize, offset: usize, - key: &Vec, - value: &Vec, + key: &[u8], + value: &[u8], ) -> Result, std::io::Error> { let data_size = data_util::get_data_size(key.len(), value.len()); let aligned_tail = offset + data_util::round_up_size(data_size); @@ -197,7 +197,7 @@ impl LeafManager { .map_mut(&self.leaves_file)? }; - let data = data_util::format_data_with_crc(&key, &value); + let data = data_util::format_data_with_crc(key, value); mmap.copy_from_slice(&data); mmap.flush()?; diff --git a/src/fptree/mod.rs b/src/fptree/mod.rs index 9d9da6a..7523e74 100644 --- a/src/fptree/mod.rs +++ b/src/fptree/mod.rs @@ -52,14 +52,14 @@ impl FPTree { fn split_root( &self, - key: Vec, + key: &[u8], mut locked_root: RwLockWriteGuard>>, locked_new_child: Arc>, ) { debug!("Root split: {:?}", key); let mut new_root = Inner::new(); new_root.set_root(true); - new_root.add_key(key); + new_root.add_key(key.to_vec()); new_root.add_child(locked_root.clone()); new_root.add_child(locked_new_child.clone()); *locked_root = Arc::new(RwLock::new(new_root)); @@ -68,7 +68,7 @@ impl FPTree { *count += 1; } - pub fn put(&self, key: &Vec, value: &Vec) -> Result<(), std::io::Error> { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> { // Lock the pointer to the root since it might be updated let locked_root = self.root_ptr.write().unwrap(); @@ -88,8 +88,7 @@ impl FPTree { let mut locked_nodes = Vec::new(); let mut is_root_locked = true; - for i in 0..nodes.len() { - let locked_node = nodes[i].write().unwrap(); + for locked_node in nodes.iter().map(|node| node.write().unwrap()) { if !locked_node.may_need_split() { is_root_locked = false; locked_nodes.clear(); @@ -99,15 +98,15 @@ impl FPTree { drop(lock); // Phase2: Insert split keys and a value - let mut inserted = value.clone(); + let mut inserted = value.to_vec(); if is_root_locked { while let Some(mut locked_node) = locked_nodes.pop() { if let Some(split_key) = locked_node.insert(key, &inserted)? { - inserted = split_key.clone(); + inserted = split_key; if locked_node.is_root() { locked_node.set_root(false); let new_child = locked_node.get_next().unwrap(); - self.split_root(inserted, locked_root, new_child); + self.split_root(&inserted, locked_root, new_child); return Ok(()); } } else { @@ -128,7 +127,7 @@ impl FPTree { Ok(()) } - pub fn get(&self, key: &Vec) -> Result>, std::io::Error> { + pub fn get(&self, key: &[u8]) -> Result>, std::io::Error> { let mut node = self.root_ptr.read().unwrap().clone(); loop { let n = node.clone(); @@ -141,7 +140,7 @@ impl FPTree { } } - pub fn delete(&self, key: &Vec) -> Result<(), std::io::Error> { + pub fn delete(&self, key: &[u8]) -> Result<(), std::io::Error> { // just add a tombstone self.put(key, &Vec::new()) } diff --git a/src/fptree/node.rs b/src/fptree/node.rs index b436020..f7b6af4 100644 --- a/src/fptree/node.rs +++ b/src/fptree/node.rs @@ -6,11 +6,10 @@ pub trait Node { fn set_root(&mut self, is_root: bool); fn is_leaf(&self) -> bool; fn get_next(&self) -> Option>>; - fn get_child(&self, key: &Vec) -> Option>>; + fn get_child(&self, key: &[u8]) -> Option>>; fn may_need_split(&self) -> bool; - fn insert(&mut self, key: &Vec, value: &Vec) - -> Result>, std::io::Error>; - fn get(&self, key: &Vec) -> Result>, std::io::Error>; + fn insert(&mut self, key: &[u8], value: &[u8]) -> Result>, std::io::Error>; + fn get(&self, key: &[u8]) -> Result>, std::io::Error>; fn split(&mut self) -> Result, std::io::Error>; fn commit(&self) -> Result<(), std::io::Error>; } diff --git a/src/fptree_manager.rs b/src/fptree_manager.rs index 30c8f23..c3776cf 100644 --- a/src/fptree_manager.rs +++ b/src/fptree_manager.rs @@ -41,7 +41,7 @@ impl FPTreeManager { >= self.config.get_root_split_threshold() } - pub fn put(&self, key: &Vec, value: &Vec) -> Result<(), std::io::Error> { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> { let locked_new = self.new_fptree_ptr.read().unwrap(); match &*locked_new { Some(n) => n.read().unwrap().put(key, value), @@ -57,7 +57,7 @@ impl FPTreeManager { } } - pub fn get(&self, key: &Vec) -> Result>, std::io::Error> { + pub fn get(&self, key: &[u8]) -> Result>, std::io::Error> { let mut result = None; // TODO: concurrenct read let locked_new = self.new_fptree_ptr.read().unwrap(); @@ -72,7 +72,7 @@ impl FPTreeManager { Ok(result) } - pub fn delete(&self, key: &Vec) -> Result<(), std::io::Error> { + pub fn delete(&self, key: &[u8]) -> Result<(), std::io::Error> { let locked_new = self.new_fptree_ptr.read().unwrap(); match &*locked_new { Some(n) => n.read().unwrap().delete(key), @@ -93,7 +93,7 @@ impl FPTreeManager { } let mut locked_new = self.new_fptree_ptr.write().unwrap(); - if let Some(_) = &*locked_new { + if (*locked_new).is_some() { // The flush is in progress return Ok(None); } diff --git a/src/kvs.rs b/src/kvs.rs index db3cecf..2f3bfff 100644 --- a/src/kvs.rs +++ b/src/kvs.rs @@ -58,11 +58,11 @@ impl KVS { }) } - pub fn put(&self, key: &Vec, value: &Vec) -> Result<(), std::io::Error> { + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), std::io::Error> { trace!( "Put K: {}, V: {}", - String::from_utf8(key.clone()).unwrap(), - String::from_utf8(value.clone()).unwrap() + String::from_utf8(key.to_vec()).unwrap(), + String::from_utf8(value.to_vec()).unwrap() ); self.fptree_manager.put(key, value)?; @@ -74,10 +74,10 @@ impl KVS { Ok(()) } - pub fn get(&self, key: &Vec) -> Result>, std::io::Error> { + pub fn get(&self, key: &[u8]) -> Result>, std::io::Error> { trace!( "Getting from K: {}", - String::from_utf8(key.clone()).unwrap() + String::from_utf8(key.to_vec()).unwrap() ); // TODO: concurrenct read @@ -98,10 +98,10 @@ impl KVS { } } - pub fn delete(&self, key: &Vec) -> Result<(), std::io::Error> { + pub fn delete(&self, key: &[u8]) -> Result<(), std::io::Error> { trace!( "Deleting from K: {}", - String::from_utf8(key.clone()).unwrap() + String::from_utf8(key.to_vec()).unwrap() ); self.fptree_manager.delete(key) diff --git a/src/sparse_index.rs b/src/sparse_index.rs index 0214fcd..84546c3 100644 --- a/src/sparse_index.rs +++ b/src/sparse_index.rs @@ -24,18 +24,18 @@ impl SparseIndex { self.table_id } - pub fn insert(&mut self, key: &Vec, offset: usize) { + pub fn insert(&mut self, key: &[u8], offset: usize) { if self.prev_offset == usize::MAX || offset - self.prev_offset >= LEAST_OFFSET { self.prev_offset = offset; - self.index.insert(key.clone(), offset); + self.index.insert(key.to_owned(), offset); } } // the offset should be always returned since the minimum key is inserted - pub fn get(&self, key: &Vec) -> usize { + pub fn get(&self, key: &[u8]) -> usize { match self.index.get(key) { Some(offset) => *offset, - None => *self.index.range(..key.clone()).last().unwrap().1, + None => *self.index.range(..key.to_vec()).last().unwrap().1, } } } diff --git a/src/sstable_manager.rs b/src/sstable_manager.rs index 293f33c..df04c14 100644 --- a/src/sstable_manager.rs +++ b/src/sstable_manager.rs @@ -32,6 +32,8 @@ struct BloomElements { bitmap: Vec, } +type TableInfo = (usize, Bloom>); + impl SstableManager { pub fn new(name: &str, config: Config) -> Result<(Self, usize), std::io::Error> { let path = config.get_table_dir_path(name); @@ -78,20 +80,20 @@ impl SstableManager { Ok(()) } - pub fn get(&self, key: &Vec) -> Result>, std::io::Error> { + pub fn get(&self, key: &[u8]) -> Result>, std::io::Error> { for (table_id, filter) in self.filters.read().unwrap().iter().rev() { trace!( "Check the bloom filter of SSTable {} with {:?}", table_id, key ); - if !filter.check(key) { + if !filter.check(&key.to_vec()) { continue; } trace!("Read from SSTable {} with {:?}", table_id, key); let indexes = self.indexes.read().unwrap(); - let index = indexes.get(&table_id).unwrap(); + let index = indexes.get(table_id).unwrap(); let offset = index.get(key); match self.get_from_table(key, *table_id, offset)? { Some(r) => return Ok(Some(r)), @@ -104,7 +106,7 @@ impl SstableManager { fn get_from_table( &self, - key: &Vec, + key: &[u8], table_id: usize, offset: usize, ) -> Result>, std::io::Error> { @@ -114,11 +116,10 @@ impl SstableManager { reader.seek(SeekFrom::Start(offset as u64))?; loop { - let cur_key; - match self.read_data(&mut reader)? { - Some(k) => cur_key = k, + let cur_key = match self.read_data(&mut reader)? { + Some(k) => k, None => return Ok(None), - } + }; let value = self.read_data(&mut reader)?; if cur_key == *key { @@ -136,10 +137,10 @@ impl SstableManager { let size = u32::from_le_bytes(size_buf) as usize; let mut data = vec![0_u8; size]; - reader.read(&mut data)?; + reader.read_exact(&mut data)?; let mut crc_buf = [0_u8; data_util::LEN_CRC]; - reader.read(&mut crc_buf)?; + reader.read_exact(&mut crc_buf)?; let crc = u32::from_le_bytes(crc_buf); data_util::check_crc(data.as_slice(), crc)?; @@ -165,7 +166,7 @@ impl SstableManager { data.extend(&encoded); data.extend(&data_util::calc_crc(&encoded).to_le_bytes()); - writer.write(&data)?; + writer.write_all(&data)?; Ok(()) } @@ -185,7 +186,7 @@ impl SstableManager { fn read_filter( &self, reader: &mut BufReader, - ) -> Result>)>, std::io::Error> { + ) -> Result, std::io::Error> { let mut size_buf = [0_u8; data_util::LEN_SIZE]; let len = reader.read(&mut size_buf)?; if len == 0 { @@ -194,10 +195,10 @@ impl SstableManager { let size = u32::from_le_bytes(size_buf) as usize; let mut data = vec![0_u8; size]; - reader.read(&mut data)?; + reader.read_exact(&mut data)?; let mut crc_buf = [0_u8; data_util::LEN_CRC]; - reader.read(&mut crc_buf)?; + reader.read_exact(&mut crc_buf)?; let crc = u32::from_le_bytes(crc_buf); data_util::check_crc(data.as_slice(), crc)?; @@ -242,7 +243,7 @@ impl SstableManager { data.extend(&encoded); data.extend(&data_util::calc_crc(&encoded).to_le_bytes()); - writer.write(&data)?; + writer.write_all(&data)?; Ok(()) } @@ -274,10 +275,10 @@ impl SstableManager { let size = u32::from_le_bytes(size_buf) as usize; let mut data = vec![0_u8; size]; - reader.read(&mut data)?; + reader.read_exact(&mut data)?; let mut crc_buf = [0_u8; data_util::LEN_CRC]; - reader.read(&mut crc_buf)?; + reader.read_exact(&mut crc_buf)?; let crc = u32::from_le_bytes(crc_buf); data_util::check_crc(data.as_slice(), crc)?; diff --git a/src/util/data_util.rs b/src/util/data_util.rs index bbbf3c8..5437de5 100644 --- a/src/util/data_util.rs +++ b/src/util/data_util.rs @@ -13,17 +13,19 @@ const LEN_REDUNDANCY: usize = LEN_SIZE + LEN_CRC; * | Size (4B) | Data | CRC (4B) | */ -pub fn format_data_with_crc(key: &Vec, value: &Vec) -> Vec { +pub fn format_data_with_crc(key: &[u8], value: &[u8]) -> Vec { let data_size = get_data_size(key.len(), value.len()); let mut data: Vec = Vec::with_capacity(data_size); + let crc = calc_crc(key).to_le_bytes(); data.extend(&(key.len() as u32).to_le_bytes()); data.extend(key); - data.extend(&calc_crc(key).to_le_bytes()); + data.extend(crc); + let crc = calc_crc(value).to_le_bytes(); data.extend(&(value.len() as u32).to_le_bytes()); data.extend(value); - data.extend(&calc_crc(value).to_le_bytes()); + data.extend(crc); data } @@ -54,7 +56,7 @@ pub fn get_bound_offset(key_size: usize) -> usize { key_size + LEN_REDUNDANCY } -pub fn calc_crc(data: &Vec) -> u32 { +pub fn calc_crc(data: &[u8]) -> u32 { let mut digest = crc32::Digest::new(crc32::IEEE); digest.write(data); @@ -62,7 +64,7 @@ pub fn calc_crc(data: &Vec) -> u32 { } pub fn check_crc(data: &[u8], crc: u32) -> Result<(), std::io::Error> { - if calc_crc(&data.to_vec()) == crc { + if calc_crc(data) == crc { Ok(()) } else { // TODO: replace with an amphis error diff --git a/src/util/file_util.rs b/src/util/file_util.rs index caa8a09..a45ce19 100644 --- a/src/util/file_util.rs +++ b/src/util/file_util.rs @@ -10,7 +10,7 @@ pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> { .read(true) .append(true) .create(false) - .open(&file_path) + .open(file_path) { Ok(f) => f, Err(e) => match e.kind() { @@ -19,7 +19,7 @@ pub fn open_file(file_path: &str) -> Result<(File, bool), std::io::Error> { .read(true) .append(true) .create(true) - .open(&file_path)?; + .open(file_path)?; f.sync_all()?; info!("New file {} is created", file_path); is_created = true;